Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions src/core/expire_period.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@

namespace dfly {

// ExpirePeriod encapsulates the expiration period of a key.
// It can represent periods in milliseconds up to ~49 days and in seconds up to ~136 years.
// If value in milliseconds is too high, it switches to seconds precision.
// And if it's still too high, it silently clamps to the max value.
class ExpirePeriod {
public:
static constexpr size_t kMaxGenId = 15;
static constexpr uint32_t kMaxGenId = 15;

ExpirePeriod() : val_(0), gen_(0), precision_(0) {
static_assert(sizeof(ExpirePeriod) == 8); // TODO
Expand All @@ -35,28 +39,30 @@ class ExpirePeriod {

void Set(uint64_t ms);

bool is_second_precision() { return precision_ == 1;}
bool is_second_precision() const {
return precision_ == 1;
}

private:
uint64_t val_ : 59;
uint64_t gen_ : 4;
uint64_t precision_ : 1; // 0 - ms, 1 - sec.
uint32_t val_;
uint8_t gen_ : 2; // generation id.
uint8_t precision_ : 1; // 0 - ms, 1 - sec.
Comment on lines +47 to +49
Copy link
Contributor

@dranikpg dranikpg Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you're optimizing with all those bitfields, but the struct is not packed either way 🙂
image

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the memory usage in this PR does not go down. It's not the goal here. I want first to introduce all the supporting functionality to support 5 bytes expire offset and then eliminate the expire table completely by moving 5 byte expire value into PrimeValue. This will reduce the memory usage for TTL enabled use-cases but will increase it for use-cases that do not use TTLs. This is why I want to use 5 bytes for expire value so we could reduce the negative impact for these use-cases.

In order to support a 5-bytes expire offset, we must model the absolute time as base + period because we must update base from time to time, otherwise after 49 days of process being run you won't be able to represent a millisecond precision with the old base. And since updating the base needs to be done gradually (by traversing the whole table), we need to have multiple bases to support the transition state.

};

inline void ExpirePeriod::Set(uint64_t ms) {
constexpr uint64_t kBarrier = (1ULL << 48);

if (ms < kBarrier) {
val_ = ms;
precision_ = 0; // ms
if (ms < UINT32_MAX) {
val_ = ms; // about 49 days in ms.
precision_ = 0; // ms
return;
}

precision_ = 1;
if (ms < kBarrier << 10) {
ms = (ms + 500) / 1000; // seconds
precision_ = 1; // seconds
if (ms < UINT64_MAX / 2) {
ms = (ms + 500) / 1000;
val_ = ms >= UINT32_MAX ? UINT32_MAX - 1 : ms;
} else {
val_ = UINT32_MAX - 1;
}
Comment on lines +59 to 65
Copy link
Contributor

@dranikpg dranikpg Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the if with int64_t::max / 2, why not just calculate the seconds value and see if it fits into int32_t 👀 Just check to divide first and then add the remainder > 0 (I assume the if is about this)

Copy link
Collaborator Author

@romange romange Sep 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max64/2 is arbitrary, I could also check max64/1024. the goal is to prevent from (ms + 500) to overflow when we check if https://github.com/dragonflydb/dragonfly/pull/5797/files#diff-260e4d827ec66db63761ebe27e45792b6e2c8f9262f22d4a07e567c3fbfb5e40R61 fits uint32_t

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a little less intuitive

val_ = ms >= kBarrier ? kBarrier - 1 : ms;
}

} // namespace dfly
2 changes: 2 additions & 0 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ class DbSlice::PrimeBumpPolicy {
DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner)
: shard_id_(index),
cache_mode_(cache_mode),
expire_allowed_(true),
expire_base_index_(0),
owner_(owner),
client_tracking_map_(owner->memory_resource()) {
db_arr_.emplace_back();
Expand Down
8 changes: 4 additions & 4 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@ class DbSlice {
}

int64_t ExpireTime(const ExpirePeriod& val) const {
return expire_base_[0] + val.duration_ms();
return expire_base_[val.generation_id()] + val.duration_ms();
}

ExpirePeriod FromAbsoluteTime(uint64_t time_ms) const {
return ExpirePeriod{time_ms - expire_base_[0]};
return ExpirePeriod{time_ms - expire_base_[expire_base_index_], expire_base_index_};
}

struct ItAndUpdater {
Expand Down Expand Up @@ -607,11 +607,11 @@ class DbSlice {

ShardId shard_id_;
uint8_t cache_mode_ : 1;

uint8_t expire_allowed_ : 1;
uint8_t expire_base_index_ : 1;
Comment on lines +610 to +611
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you like this so much 😆

EngineShard* owner_;

int64_t expire_base_[2]; // Used for expire logic, represents a real clock.
bool expire_allowed_ = true;

uint64_t version_ = 1; // Used to version entries in the PrimeTable.
uint64_t next_moved_id_ = 1;
Expand Down
10 changes: 9 additions & 1 deletion src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ std::optional<CollectedPageStats> EngineShard::DoDefrag(CollectPageStats collect
return page_usage.CollectedStats();
}

constexpr uint32_t kRunAtLowPriority = 0u;

// the memory defragmentation task is as follow:
// 1. Check if memory usage is high enough
// 2. Check if diff between commited and used memory is high enough
Expand All @@ -310,7 +312,6 @@ std::optional<CollectedPageStats> EngineShard::DoDefrag(CollectPageStats collect
// priority.
// otherwise lower the task priority so that it would not use the CPU when not required
uint32_t EngineShard::DefragTask() {
constexpr uint32_t kRunAtLowPriority = 0u;
if (!namespaces) {
return kRunAtLowPriority;
}
Expand All @@ -326,6 +327,10 @@ uint32_t EngineShard::DefragTask() {
return 6; // priority.
}

uint32_t EngineShard::UpdateExpiresTask() {
return kRunAtLowPriority;
}

EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
: txq_([](const Transaction* t) { return t->txid(); }),
queue_(kQueueLen, 1, 1),
Expand All @@ -347,6 +352,8 @@ void EngineShard::Shutdown() {

void EngineShard::StopPeriodicFiber() {
ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
ProactorBase::me()->RemoveOnIdleTask(update_expire_base_task_);

fiber_heartbeat_periodic_done_.Notify();
if (fiber_heartbeat_periodic_.IsJoinable()) {
fiber_heartbeat_periodic_.Join();
Expand Down Expand Up @@ -394,6 +401,7 @@ void EngineShard::StartPeriodicHeartbeatFiber(util::ProactorBase* pb) {
RunFPeriodically(heartbeat, period_ms, "heartbeat", &fiber_heartbeat_periodic_done_);
});
defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); });
update_expire_base_task_ = pb->AddOnIdleTask([this]() { return UpdateExpiresTask(); });
}

void EngineShard::StartPeriodicShardHandlerFiber(util::ProactorBase* pb,
Expand Down
5 changes: 4 additions & 1 deletion src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class EngineShard {
// context of the controlling thread will access this shard!
// --------------------------------------------------------------------------
uint32_t DefragTask();
uint32_t UpdateExpiresTask();

TxQueue txq_;
TaskQueue queue_, queue2_;
Expand All @@ -283,7 +284,9 @@ class EngineShard {
journal::Journal* journal_ = nullptr;
IntentLock shard_lock_;

uint32_t defrag_task_ = 0;
// Idle tasks.
uint32_t defrag_task_ = 0, update_expire_base_task_ = 0;

EvictionTaskState eviction_state_; // Used on eviction fiber
util::fb2::Fiber fiber_heartbeat_periodic_;
util::fb2::Done fiber_heartbeat_periodic_done_;
Expand Down
Loading