diff --git a/src/core/expire_period.h b/src/core/expire_period.h index 6f8511eb2f36..55d9c960fdfa 100644 --- a/src/core/expire_period.h +++ b/src/core/expire_period.h @@ -10,13 +10,13 @@ namespace dfly { class ExpirePeriod { public: - static constexpr size_t kMaxGenId = 15; + static constexpr std::size_t kMaxGenId = 15; ExpirePeriod() : val_(0), gen_(0), precision_(0) { static_assert(sizeof(ExpirePeriod) == 8); // TODO } - explicit ExpirePeriod(uint64_t ms, unsigned gen = 0) : ExpirePeriod() { + explicit ExpirePeriod(uint64_t ms, unsigned gen = 0) : gen_(gen) { Set(ms); } @@ -35,7 +35,9 @@ class ExpirePeriod { void Set(uint64_t ms); - bool is_second_precision() { return precision_ == 1;} + bool is_second_precision() { + return precision_ == 1; + } private: uint64_t val_ : 59; @@ -48,13 +50,13 @@ inline void ExpirePeriod::Set(uint64_t ms) { if (ms < kBarrier) { val_ = ms; - precision_ = 0; // ms + precision_ = 0; // ms return; } precision_ = 1; if (ms < kBarrier << 10) { - ms = (ms + 500) / 1000; // seconds + ms = (ms + 500) / 1000; // seconds } val_ = ms >= kBarrier ? kBarrier - 1 : ms; } diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 1aa1d3f8ca1c..7528aadb9794 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -398,6 +398,8 @@ class DbSlice::PrimeBumpPolicy { DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner) : shard_id_(index), cache_mode_(cache_mode), + expire_allowed_(1), + expire_gen_id_(0), owner_(owner), client_tracking_map_(owner->memory_resource()) { db_arr_.emplace_back(); @@ -1241,12 +1243,16 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato return {it, ExpireIterator{}}; } - // TODO: to employ multi-generation update of expire-base and the underlying values. int64_t expire_time = ExpireTime(expire_it->second); // Never do expiration on replica or if expiration is disabled or global lock was taken. if (int64_t(cntx.time_now_ms) < expire_time || owner_->IsReplica() || !expire_allowed_ || !shard_owner()->shard_lock()->Check(IntentLock::Mode::EXCLUSIVE)) { + // Update expiry if needed. + if (expire_it->second.generation_id() != expire_gen_id_) { + expire_it->second = FromAbsoluteTime(expire_time); + } + // Keeping the entry. return {it, expire_it}; } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index ff44a087b234..fe440dae8203 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -234,8 +234,9 @@ class DbSlice { // Returns slot statistics for db 0. SlotStats GetSlotStats(SlotId sid) const; - void UpdateExpireBase(uint64_t now, unsigned generation) { - expire_base_[generation & 1] = now; + void NextExpireGen(uint64_t now_ms) { + expire_gen_id_ ^= 1; + expire_base_[expire_gen_id_] = now_ms; } void UpdateMemoryParams(int64_t budget, size_t bytes_per_object) { @@ -251,12 +252,17 @@ class DbSlice { return bytes_per_object_; } + // returns expire time in ms. int64_t ExpireTime(const ExpirePeriod& val) const { - return expire_base_[0] + val.duration_ms(); + return expire_base_[val.generation_id()] + val.duration_ms(); } ExpirePeriod FromAbsoluteTime(uint64_t time_ms) const { - return ExpirePeriod{time_ms - expire_base_[0]}; + return ExpirePeriod{time_ms - expire_base_[expire_gen_id_], expire_gen_id_}; + } + + unsigned expire_gen_id() const { + return expire_gen_id_; } struct ItAndUpdater { @@ -616,11 +622,17 @@ class DbSlice { ShardId shard_id_; uint8_t cache_mode_ : 1; + uint8_t expire_allowed_ : 1; + uint8_t expire_gen_id_ : 1; EngineShard* owner_; + // base time for computing expirations. + // the absolute expiration time is expire_base_ + relative_time. + // In order to allow rolling updates of expire_base_ we maintain two + // generations of expire_base_ and each expiration entry has a generation_id + // that tells which expire_base_ to use. int64_t expire_base_[2]; // Used for expire logic, represents a real clock. - bool expire_allowed_ = true; uint64_t version_ = 1; // Used to version entries in the PrimeTable. uint64_t next_moved_id_ = 1; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 9693529780f8..af0950adc6bd 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -872,7 +872,7 @@ optional DebugCmd::ParsePopulateArgs(CmdArgList args, } case FLAG_EXPIRE: { auto [min_ttl, max_ttl] = parser.Next(); - if (min_ttl >= max_ttl) { + if (min_ttl > max_ttl) { builder->SendError(kExpiryOutOfRange); (void)parser.TakeError(); return nullopt; @@ -1641,7 +1641,8 @@ void DebugCmd::DoPopulateBatch(const PopulateOptions& options, const PopulateBat if (options.expire_ttl_range.has_value()) { uint32_t start = options.expire_ttl_range->first; uint32_t end = options.expire_ttl_range->second; - uint32_t expire_ttl = rand() % (end - start) + start; + uint32_t expire_ttl = start + ((end > start) ? rand() % (end - start) : 0); + VLOG(1) << "set key " << key << " expire ttl as " << expire_ttl; auto cid = sf_.service().mutable_registry()->Find("EXPIRE"); absl::InlinedVector args; diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index bf79d05d4be6..384650a6f9b1 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -953,6 +953,24 @@ TEST_F(DflyEngineTest, Huffman) { EXPECT_LT(metrics.heap_used_bytes, 14'000'000); // less than 15mb } +TEST_F(DflyEngineTest, RebaseExpire) { + // expire: two weeks in seconds + auto resp = Run({"debug", "populate", "20000", "key", "10", "EXPIRE", "1209600", "1209600"}); + EXPECT_EQ(resp, "OK"); + resp = Run({"EXPIRETIME", "key:42"}); + long exp_time = *resp.GetInt(); + EXPECT_GT(exp_time, 0); + AdvanceTime(2 * 24 * 3600 * 1000); // advance 2 days. + + // verify that all keys have been updated. + ExpectConditionWithinTimeout([&] { + auto metrics = GetMetrics(); + return metrics.shard_stats.total_update_expire_calls == 20'000; + }); + resp = Run({"EXPIRETIME", "key:42"}); + EXPECT_EQ(*resp.GetInt(), exp_time); +} + class DflyCommandAliasTest : public DflyEngineTest { protected: DflyCommandAliasTest() { diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index ced29b09fcb6..e01bcdb29201 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -141,7 +141,7 @@ string EngineShard::TxQueueInfo::Format() const { } EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { - static_assert(sizeof(Stats) == 96); + static_assert(sizeof(Stats) == 104); #define ADD(x) x += o.x @@ -157,6 +157,7 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) ADD(total_heartbeat_expired_bytes); ADD(total_heartbeat_expired_calls); ADD(total_migrated_keys); + ADD(total_update_expire_calls); #undef ADD return *this; @@ -301,6 +302,8 @@ std::optional EngineShard::DoDefrag(CollectPageStats collect return page_usage.CollectedStats(); } +constexpr uint32_t kRunAtLowPriority = 0u; + // the memory defragmentation task is as follow: // 1. Check if memory usage is high enough // 2. Check if diff between commited and used memory is high enough @@ -310,7 +313,6 @@ std::optional EngineShard::DoDefrag(CollectPageStats collect // priority. // otherwise lower the task priority so that it would not use the CPU when not required uint32_t EngineShard::DefragTask() { - constexpr uint32_t kRunAtLowPriority = 0u; if (!namespaces) { return kRunAtLowPriority; } @@ -326,6 +328,97 @@ uint32_t EngineShard::DefragTask() { return 6; // priority. } +// TODO: this is wrong. we can do better by updating the expire base in DeleteExpiredStep. +uint32_t EngineShard::UpdateExpiresTask() { + if (!namespaces) { + return kRunAtLowPriority; + } + + DVLOG(1) << "EngineShard::UpdateExpiresTask shard_id: " << shard_id_; + + DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); + uint64_t now_ms = GetCurrentTimeMs(); + + // measure the age of the current expire base. + unsigned base_age_sec = db_slice.FromAbsoluteTime(now_ms).duration_ms() / 1000; + + const uint64_t kLowThresholdSec = 3600 * 24 * 2; // 2 days + const uint64_t kHighThresholdSec = 3600 * 24 * 7; // 7 days + if (update_expire_state_ == nullptr) { + if (base_age_sec < kLowThresholdSec) { + // no need to update expire base if period is less than a few days. + return kRunAtLowPriority; + } + + db_slice.NextExpireGen(now_ms); + update_expire_state_ = new UpdateExpireState(); + VLOG(1) << "shard " << shard_id_ << " updated expire base to " << now_ms + << ", generation: " << db_slice.expire_gen_id(); + } + + DCHECK(update_expire_state_ != nullptr); + if (base_age_sec > kHighThresholdSec) { + LOG_EVERY_T(ERROR, 3600) << "Expire base age is very high: " << base_age_sec; + } + + if (update_expire_state_->db_index < db_slice.db_array_size()) { + unsigned current_gen_id = db_slice.expire_gen_id(); + + auto cb = [&](ExpireIterator it) { + if (it->second.generation_id() != current_gen_id) { + int64_t ms = db_slice.ExpireTime(it->second); + + // only update if the expire time is in the future. + // we rely on DeleteExpiredStep to delete expired keys because we may need to propagate + // deletions to the journal. + if (ms > int64_t(now_ms)) { + DVLOG(2) << "Update expire generation from " << it->second.generation_id() << " to " + << current_gen_id; + it->second = db_slice.FromAbsoluteTime(ms); + + DCHECK_EQ(current_gen_id, db_slice.expire_gen_id()); + DCHECK_EQ(ms, db_slice.ExpireTime(it->second)); + + stats_.total_update_expire_calls++; + } else { + update_expire_state_->stale_entries++; + } + } + }; + + auto& expire_table = db_slice.GetDBTable(update_expire_state_->db_index)->expire; + unsigned iters = 0; + do { + auto next = expire_table.Traverse(detail::DashCursor(update_expire_state_->cursor), cb); + if (next) { + update_expire_state_->cursor = next.token(); + } else { + // finished this db, move to the next + update_expire_state_->cursor = 0; + ++update_expire_state_->db_index; + while (update_expire_state_->db_index < db_slice.db_array_size() && + !db_slice.IsDbValid(update_expire_state_->db_index)) { + ++update_expire_state_->db_index; + } + } + } while (update_expire_state_->cursor && ++iters < 100); + } + + if (update_expire_state_->db_index >= db_slice.db_array_size()) { + if (update_expire_state_->stale_entries == 0) { + // We went over all the items and not stale items were found, we are done. + delete update_expire_state_; + update_expire_state_ = nullptr; + return kRunAtLowPriority; + } + + // Repeat the process if we found stale entries to update. + *update_expire_state_ = {}; + } + + return 5; // run again soon, moderate frequency. +} + EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap) : txq_([](const Transaction* t) { return t->txid(); }), queue_(kQueueLen, 1, 1), @@ -347,6 +440,8 @@ void EngineShard::Shutdown() { void EngineShard::StopPeriodicFiber() { ProactorBase::me()->RemoveOnIdleTask(defrag_task_); + ProactorBase::me()->RemoveOnIdleTask(update_expire_base_task_); + fiber_heartbeat_periodic_done_.Notify(); if (fiber_heartbeat_periodic_.IsJoinable()) { fiber_heartbeat_periodic_.Join(); @@ -394,6 +489,7 @@ void EngineShard::StartPeriodicHeartbeatFiber(util::ProactorBase* pb) { RunFPeriodically(heartbeat, period_ms, "heartbeat", &fiber_heartbeat_periodic_done_); }); defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); }); + update_expire_base_task_ = pb->AddOnIdleTask([this]() { return UpdateExpiresTask(); }); } void EngineShard::StartPeriodicShardHandlerFiber(util::ProactorBase* pb, @@ -605,8 +701,10 @@ void EngineShard::Heartbeat() { // TODO: iterate over all namespaces DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); + // Skip heartbeat if we are serializing a big value static auto start = std::chrono::system_clock::now(); + // Skip heartbeat if global transaction is in process. // This is determined by attempting to check if shard lock can be acquired. const bool can_acquire_global_lock = shard_lock()->Check(IntentLock::Mode::EXCLUSIVE); @@ -620,6 +718,7 @@ void EngineShard::Heartbeat() { } return; } + start = std::chrono::system_clock::now(); if (!IsReplica()) { // Never run expiry/evictions on replica. diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 201b67cfc74d..41ffa86b472a 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -49,6 +49,7 @@ class EngineShard { uint64_t total_heartbeat_expired_keys = 0; uint64_t total_heartbeat_expired_bytes = 0; uint64_t total_heartbeat_expired_calls = 0; + uint64_t total_update_expire_calls = 0; // cluster stats uint64_t total_migrated_keys = 0; @@ -258,6 +259,7 @@ class EngineShard { // context of the controlling thread will access this shard! // -------------------------------------------------------------------------- uint32_t DefragTask(); + uint32_t UpdateExpiresTask(); TxQueue txq_; TaskQueue queue_, queue2_; @@ -284,8 +286,18 @@ class EngineShard { journal::Journal* journal_ = nullptr; IntentLock shard_lock_; - uint32_t defrag_task_ = 0; + // Idle tasks. + uint32_t defrag_task_ = 0, update_expire_base_task_ = 0; + EvictionTaskState eviction_state_; // Used on eviction fiber + struct UpdateExpireState { + uint64_t cursor = 0; + DbIndex db_index = 0; + uint64_t stale_entries = 0; + }; + + UpdateExpireState* update_expire_state_ = nullptr; + util::fb2::Fiber fiber_heartbeat_periodic_; util::fb2::Done fiber_heartbeat_periodic_done_; diff --git a/src/server/namespaces.cc b/src/server/namespaces.cc index ad2cefa09295..e824592c4c51 100644 --- a/src/server/namespaces.cc +++ b/src/server/namespaces.cc @@ -22,7 +22,7 @@ Namespace::Namespace() { CHECK(es != nullptr); ShardId sid = es->shard_id(); shard_db_slices_[sid] = make_unique(sid, absl::GetFlag(FLAGS_cache_mode), es); - shard_db_slices_[sid]->UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0); + shard_db_slices_[sid]->NextExpireGen(absl::GetCurrentTimeNanos() / 1000000); }); } diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index df3436b13197..6e59785251f2 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -255,7 +255,7 @@ void BaseFamilyTest::ResetService() { TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000; auto default_ns = &namespaces->GetDefaultNamespace(); auto cb = [&](EngineShard* s) { - default_ns->GetDbSlice(s->shard_id()).UpdateExpireBase(TEST_current_time_ms - 1000, 0); + default_ns->GetDbSlice(s->shard_id()).NextExpireGen(TEST_current_time_ms - 1000); }; shard_set->RunBriefInParallel(cb);