Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 125 additions & 2 deletions src/core/dense_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ DenseSet::IteratorBase::IteratorBase(const DenseSet* owner, bool is_end)
curr_entry_ = nullptr;
owner_ = nullptr;
} else {
++owner_->num_iterators_;
curr_entry_ = &(*curr_list_);
owner->ExpireIfNeeded(nullptr, curr_entry_);

Expand Down Expand Up @@ -373,22 +374,139 @@ auto DenseSet::FindEmptyAround(uint32_t bid) -> ChainVectorIterator {
return entries_.end();
}

void DenseSet::Reserve(size_t sz) {
void DenseSet::Resize(size_t sz) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is controversial because it describes some changes to the internal representation. Consider using AdjustCapacity()

sz = std::max<size_t>(sz, kMinSize);
// Don't shrink below the current number of elements
sz = std::max<size_t>(sz, size_);

sz = absl::bit_ceil(sz);
if (sz > entries_.size()) {
size_t prev_size = entries_.size();
entries_.resize(sz);
capacity_log_ = absl::bit_width(sz) - 1;
Grow(prev_size);
} else if (sz < entries_.size()) {
Shrink(sz);
}
}

void DenseSet::Shrink(size_t new_size) {
DCHECK(absl::has_single_bit(new_size));
DCHECK_GE(new_size, kMinSize);
DCHECK_LT(new_size, entries_.size());

// Guard against recursive shrink (e.g., if ExpireIfNeeded triggers Delete)
if (shrinking_) {
return;
}

// Check num_iterators_ to prevent shrink during iteration (would invalidate iterators)
if (num_iterators_ != 0) {
return;
}
Comment on lines +404 to +406
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks strange to me that shrinking depends on the iterators existing

Comment on lines +403 to +406
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually like higher level safeguards. On the other hand, having one like this, assumes that this is valid or possible behaviour, and even worse, if it occurs, it will happen unnoticed, because the function just silently returns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, external ExpireIfNeeded calling Delete and then calling this function looks really dangerous. It holds references, the bucket id, etc. It also never holds an iterator directly, so none of your safeguards will kick in

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover the safeguards, can be hurting as well, consider:

for (auto it: set) {
  set->Delete(it)
}

Shrink won't be called even once even if the usage drops to 0%


shrinking_ = true;

size_t prev_size = entries_.size();
capacity_log_ = absl::bit_width(new_size) - 1;

// Clear all displaced flags first. During shrinking, we'll recalculate all bucket
// positions based on hash, so the displaced markers are no longer valid.
// This also ensures PushFront's DCHECK(!it->IsDisplaced()) passes.
for (size_t i = 0; i < prev_size; ++i) {
if (!entries_[i].IsEmpty()) {
entries_[i].ClearDisplaced();
}
}

// Process from high to low. This mirrors Grow's iteration order.
// Elements move to lower buckets (not yet processed) based on their hash.
for (long i = prev_size - 1; i >= 0; --i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my mind, it should be vice versa from 0 to size, because you move them into lower positions, to avoid double processing

DensePtr* curr = &entries_[i];
DensePtr* prev = nullptr;

do {
if (ExpireIfNeeded(prev, curr)) {
// If curr has disappeared due to expiry and prev was converted from Link
// to a regular DensePtr
if (prev && !prev->IsLink())
break;
}

if (curr->IsEmpty())
break;

void* ptr = curr->GetObject();

DCHECK(ptr != nullptr && ObjectAllocSize(ptr));

uint32_t new_bid = BucketId(ptr, 0);

// If the item stays in the current bucket, ensure it is not marked as
// displaced and move to the next item in the chain
if (new_bid == static_cast<uint32_t>(i)) {
curr->ClearDisplaced();
prev = curr;
curr = curr->Next();
if (curr == nullptr)
break;
} else {
// Element needs to move to a different bucket
auto dest = entries_.begin() + new_bid;
DensePtr dptr = *curr;

if (curr->IsObject()) {
if (prev) {
DCHECK(prev->IsLink());

DenseLinkKey* plink = prev->AsLink();
DCHECK(&plink->next == curr);

// We want to make *prev a DensePtr instead of DenseLink and we
// want to deallocate the link.
DensePtr tmp = DensePtr::From(plink);

// Important to transfer the ttl flag.
tmp.SetTtl(prev->HasTtl());
DCHECK(ObjectAllocSize(tmp.GetObject()));

FreeLink(plink);
// We deallocated the link, curr is invalid now.
curr = nullptr;
*prev = tmp;
} else {
// prev == nullptr
curr->Reset(); // Reset the root placeholder.
}
} else {
// !curr.IsObject
*curr = *dptr.Next();
DCHECK(!curr->IsEmpty());
}

DVLOG(2) << " Shrink: Moving from " << i << " to " << new_bid;
dptr.ClearDisplaced();
PushFront(dest, dptr);
}
} while (curr);
}

// Recount used buckets after shrinking
num_used_buckets_ = 0;
for (size_t i = 0; i < new_size; ++i) {
if (!entries_[i].IsEmpty()) {
++num_used_buckets_;
}
}

entries_.resize(new_size);
shrinking_ = false;
}

void DenseSet::Fill(DenseSet* other) const {
DCHECK(other->entries_.empty());

other->Reserve(UpperBoundSize());
other->Resize(UpperBoundSize());

constexpr unsigned kArrLen = 32;
CloneItem arr[kArrLen];
Expand Down Expand Up @@ -672,6 +790,11 @@ void DenseSet::Delete(DensePtr* prev, DensePtr* ptr) {
obj_malloc_used_ -= ObjectAllocSize(obj);
--size_;
ObjDelete(obj, false);

// Automatically shrink when utilization drops below 25%
if (entries_.size() > kMinSize && size_ < entries_.size() / 4) {
Shrink(entries_.size() / 2);
}
}

DenseSet::ChainVectorIterator DenseSet::GetRandomChain() {
Expand Down
41 changes: 40 additions & 1 deletion src/core/dense_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,39 @@ class DenseSet {
public:
IteratorBase(DenseSet* owner, ChainVectorIterator list_it, DensePtr* e)
: owner_(owner), curr_list_(list_it), curr_entry_(e) {
if (owner_) {
++owner_->num_iterators_;
}
}

~IteratorBase() {
if (owner_) {
--owner_->num_iterators_;
}
}

// Copy constructor - increment iterator count
IteratorBase(const IteratorBase& other)
: owner_(other.owner_), curr_list_(other.curr_list_), curr_entry_(other.curr_entry_) {
if (owner_) {
++owner_->num_iterators_;
}
}
Comment on lines +196 to +201
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can call upper base constructor


// Copy assignment - handle iterator counts
IteratorBase& operator=(const IteratorBase& other) {
if (this != &other) {
if (owner_) {
--owner_->num_iterators_;
}
owner_ = other.owner_;
curr_list_ = other.curr_list_;
curr_entry_ = other.curr_entry_;
if (owner_) {
++owner_->num_iterators_;
}
}
return *this;
}

// returns the expiry time of the current entry or UINT32_MAX if no ttl is set.
Expand Down Expand Up @@ -254,7 +287,10 @@ class DenseSet {
using ItemCb = std::function<void(const void*)>;

uint32_t Scan(uint32_t cursor, const ItemCb& cb) const;
void Reserve(size_t sz);

// Resizes the table to the specified size. Can both grow and shrink.
// The size will be rounded up to the nearest power of 2, with minimum of kMinSize.
void Resize(size_t sz);

void Fill(DenseSet* other) const;

Expand Down Expand Up @@ -367,6 +403,7 @@ class DenseSet {
// belong to given bid
bool NoItemBelongsBucket(uint32_t bid) const;
void Grow(size_t prev_size);
void Shrink(size_t new_size);

// ============ Pseudo Linked List Functions for interacting with Chains ==================
size_t PushFront(ChainVectorIterator, void* obj, bool has_ttl);
Expand Down Expand Up @@ -418,6 +455,8 @@ class DenseSet {
uint32_t time_now_ = 0;

mutable bool expiration_used_ = false;
mutable bool shrinking_ = false; // Guard against recursive shrink
mutable uint32_t num_iterators_ = 0; // Number of active iterators
};

inline void* DenseSet::FindInternal(const void* obj, uint64_t hashcode, uint32_t cookie) const {
Expand Down
2 changes: 1 addition & 1 deletion src/core/sorted_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ size_t SortedMap::MallocSize() const {
}

bool SortedMap::Reserve(size_t sz) {
score_map->Reserve(sz);
score_map->Resize(sz);
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/string_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ unsigned StringSet::AddMany(absl::Span<std::string_view> span, uint32_t ttl_sec,
std::string_view views[kMaxBatchLen];
unsigned res = 0;
if (BucketCount() < span.size()) {
Reserve(span.size());
Resize(span.size());
}

while (span.size() >= kMaxBatchLen) {
Expand Down
Loading
Loading