diff --git a/src/core/expire_period.h b/src/core/expire_period.h index 6f8511eb2f36..a73ee040558d 100644 --- a/src/core/expire_period.h +++ b/src/core/expire_period.h @@ -8,9 +8,13 @@ namespace dfly { +// ExpirePeriod encapsulates the expiration period of a key. +// It can represent periods in milliseconds up to ~49 days and in seconds up to ~136 years. +// If value in milliseconds is too high, it switches to seconds precision. +// And if it's still too high, it silently clamps to the max value. class ExpirePeriod { public: - static constexpr size_t kMaxGenId = 15; + static constexpr uint32_t kMaxGenId = 15; ExpirePeriod() : val_(0), gen_(0), precision_(0) { static_assert(sizeof(ExpirePeriod) == 8); // TODO @@ -35,28 +39,30 @@ class ExpirePeriod { void Set(uint64_t ms); - bool is_second_precision() { return precision_ == 1;} + bool is_second_precision() const { + return precision_ == 1; + } private: - uint64_t val_ : 59; - uint64_t gen_ : 4; - uint64_t precision_ : 1; // 0 - ms, 1 - sec. + uint32_t val_; + uint8_t gen_ : 2; // generation id. + uint8_t precision_ : 1; // 0 - ms, 1 - sec. }; inline void ExpirePeriod::Set(uint64_t ms) { - constexpr uint64_t kBarrier = (1ULL << 48); - - if (ms < kBarrier) { - val_ = ms; - precision_ = 0; // ms + if (ms < UINT32_MAX) { + val_ = ms; // about 49 days in ms. + precision_ = 0; // ms return; } - precision_ = 1; - if (ms < kBarrier << 10) { - ms = (ms + 500) / 1000; // seconds + precision_ = 1; // seconds + if (ms < UINT64_MAX / 2) { + ms = (ms + 500) / 1000; + val_ = ms >= UINT32_MAX ? UINT32_MAX - 1 : ms; + } else { + val_ = UINT32_MAX - 1; } - val_ = ms >= kBarrier ? kBarrier - 1 : ms; } } // namespace dfly diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index cabbdf25de7a..8dc7259ce378 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -398,6 +398,8 @@ class DbSlice::PrimeBumpPolicy { DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner) : shard_id_(index), cache_mode_(cache_mode), + expire_allowed_(true), + expire_base_index_(0), owner_(owner), client_tracking_map_(owner->memory_resource()) { db_arr_.emplace_back(); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 3a1933b57a9f..571532fbfd47 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -252,11 +252,11 @@ class DbSlice { } int64_t ExpireTime(const ExpirePeriod& val) const { - return expire_base_[0] + val.duration_ms(); + return expire_base_[val.generation_id()] + val.duration_ms(); } ExpirePeriod FromAbsoluteTime(uint64_t time_ms) const { - return ExpirePeriod{time_ms - expire_base_[0]}; + return ExpirePeriod{time_ms - expire_base_[expire_base_index_], expire_base_index_}; } struct ItAndUpdater { @@ -607,11 +607,11 @@ class DbSlice { ShardId shard_id_; uint8_t cache_mode_ : 1; - + uint8_t expire_allowed_ : 1; + uint8_t expire_base_index_ : 1; EngineShard* owner_; int64_t expire_base_[2]; // Used for expire logic, represents a real clock. - bool expire_allowed_ = true; uint64_t version_ = 1; // Used to version entries in the PrimeTable. uint64_t next_moved_id_ = 1; diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 945d40350492..3fdd815c8833 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -301,6 +301,8 @@ std::optional EngineShard::DoDefrag(CollectPageStats collect return page_usage.CollectedStats(); } +constexpr uint32_t kRunAtLowPriority = 0u; + // the memory defragmentation task is as follow: // 1. Check if memory usage is high enough // 2. Check if diff between commited and used memory is high enough @@ -310,7 +312,6 @@ std::optional EngineShard::DoDefrag(CollectPageStats collect // priority. // otherwise lower the task priority so that it would not use the CPU when not required uint32_t EngineShard::DefragTask() { - constexpr uint32_t kRunAtLowPriority = 0u; if (!namespaces) { return kRunAtLowPriority; } @@ -326,6 +327,10 @@ uint32_t EngineShard::DefragTask() { return 6; // priority. } +uint32_t EngineShard::UpdateExpiresTask() { + return kRunAtLowPriority; +} + EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap) : txq_([](const Transaction* t) { return t->txid(); }), queue_(kQueueLen, 1, 1), @@ -347,6 +352,8 @@ void EngineShard::Shutdown() { void EngineShard::StopPeriodicFiber() { ProactorBase::me()->RemoveOnIdleTask(defrag_task_); + ProactorBase::me()->RemoveOnIdleTask(update_expire_base_task_); + fiber_heartbeat_periodic_done_.Notify(); if (fiber_heartbeat_periodic_.IsJoinable()) { fiber_heartbeat_periodic_.Join(); @@ -394,6 +401,7 @@ void EngineShard::StartPeriodicHeartbeatFiber(util::ProactorBase* pb) { RunFPeriodically(heartbeat, period_ms, "heartbeat", &fiber_heartbeat_periodic_done_); }); defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); }); + update_expire_base_task_ = pb->AddOnIdleTask([this]() { return UpdateExpiresTask(); }); } void EngineShard::StartPeriodicShardHandlerFiber(util::ProactorBase* pb, diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index ce0a211f7959..6707634a4a82 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -257,6 +257,7 @@ class EngineShard { // context of the controlling thread will access this shard! // -------------------------------------------------------------------------- uint32_t DefragTask(); + uint32_t UpdateExpiresTask(); TxQueue txq_; TaskQueue queue_, queue2_; @@ -283,7 +284,9 @@ class EngineShard { journal::Journal* journal_ = nullptr; IntentLock shard_lock_; - uint32_t defrag_task_ = 0; + // Idle tasks. + uint32_t defrag_task_ = 0, update_expire_base_task_ = 0; + EvictionTaskState eviction_state_; // Used on eviction fiber util::fb2::Fiber fiber_heartbeat_periodic_; util::fb2::Done fiber_heartbeat_periodic_done_;