diff --git a/src/core/dense_set.cc b/src/core/dense_set.cc index ab851dde97ec..7d77c7f491de 100644 --- a/src/core/dense_set.cc +++ b/src/core/dense_set.cc @@ -40,6 +40,7 @@ DenseSet::IteratorBase::IteratorBase(const DenseSet* owner, bool is_end) curr_entry_ = nullptr; owner_ = nullptr; } else { + ++owner_->num_iterators_; curr_entry_ = &(*curr_list_); owner->ExpireIfNeeded(nullptr, curr_entry_); @@ -373,8 +374,10 @@ auto DenseSet::FindEmptyAround(uint32_t bid) -> ChainVectorIterator { return entries_.end(); } -void DenseSet::Reserve(size_t sz) { +void DenseSet::Resize(size_t sz) { sz = std::max(sz, kMinSize); + // Don't shrink below the current number of elements + sz = std::max(sz, size_); sz = absl::bit_ceil(sz); if (sz > entries_.size()) { @@ -382,13 +385,128 @@ void DenseSet::Reserve(size_t sz) { entries_.resize(sz); capacity_log_ = absl::bit_width(sz) - 1; Grow(prev_size); + } else if (sz < entries_.size()) { + Shrink(sz); } } +void DenseSet::Shrink(size_t new_size) { + DCHECK(absl::has_single_bit(new_size)); + DCHECK_GE(new_size, kMinSize); + DCHECK_LT(new_size, entries_.size()); + + // Guard against recursive shrink (e.g., if ExpireIfNeeded triggers Delete) + if (shrinking_) { + return; + } + + // Check num_iterators_ to prevent shrink during iteration (would invalidate iterators) + if (num_iterators_ != 0) { + return; + } + + shrinking_ = true; + + size_t prev_size = entries_.size(); + capacity_log_ = absl::bit_width(new_size) - 1; + + // Clear all displaced flags first. During shrinking, we'll recalculate all bucket + // positions based on hash, so the displaced markers are no longer valid. + // This also ensures PushFront's DCHECK(!it->IsDisplaced()) passes. + for (size_t i = 0; i < prev_size; ++i) { + if (!entries_[i].IsEmpty()) { + entries_[i].ClearDisplaced(); + } + } + + // Process from high to low. This mirrors Grow's iteration order. + // Elements move to lower buckets (not yet processed) based on their hash. + for (long i = prev_size - 1; i >= 0; --i) { + DensePtr* curr = &entries_[i]; + DensePtr* prev = nullptr; + + do { + if (ExpireIfNeeded(prev, curr)) { + // If curr has disappeared due to expiry and prev was converted from Link + // to a regular DensePtr + if (prev && !prev->IsLink()) + break; + } + + if (curr->IsEmpty()) + break; + + void* ptr = curr->GetObject(); + + DCHECK(ptr != nullptr && ObjectAllocSize(ptr)); + + uint32_t new_bid = BucketId(ptr, 0); + + // If the item stays in the current bucket, ensure it is not marked as + // displaced and move to the next item in the chain + if (new_bid == static_cast(i)) { + curr->ClearDisplaced(); + prev = curr; + curr = curr->Next(); + if (curr == nullptr) + break; + } else { + // Element needs to move to a different bucket + auto dest = entries_.begin() + new_bid; + DensePtr dptr = *curr; + + if (curr->IsObject()) { + if (prev) { + DCHECK(prev->IsLink()); + + DenseLinkKey* plink = prev->AsLink(); + DCHECK(&plink->next == curr); + + // We want to make *prev a DensePtr instead of DenseLink and we + // want to deallocate the link. + DensePtr tmp = DensePtr::From(plink); + + // Important to transfer the ttl flag. + tmp.SetTtl(prev->HasTtl()); + DCHECK(ObjectAllocSize(tmp.GetObject())); + + FreeLink(plink); + // We deallocated the link, curr is invalid now. + curr = nullptr; + *prev = tmp; + } else { + // prev == nullptr + curr->Reset(); // Reset the root placeholder. + } + } else { + // !curr.IsObject + *curr = *dptr.Next(); + DCHECK(!curr->IsEmpty()); + } + + DVLOG(2) << " Shrink: Moving from " << i << " to " << new_bid; + dptr.ClearDisplaced(); + PushFront(dest, dptr); + } + } while (curr); + } + + // Recount used buckets after shrinking + num_used_buckets_ = 0; + for (size_t i = 0; i < new_size; ++i) { + if (!entries_[i].IsEmpty()) { + ++num_used_buckets_; + } + } + + entries_.resize(new_size); + shrinking_ = false; +} + void DenseSet::Fill(DenseSet* other) const { DCHECK(other->entries_.empty()); - other->Reserve(UpperBoundSize()); + other->Resize(UpperBoundSize()); constexpr unsigned kArrLen = 32; CloneItem arr[kArrLen]; @@ -672,6 +790,11 @@ void DenseSet::Delete(DensePtr* prev, DensePtr* ptr) { obj_malloc_used_ -= ObjectAllocSize(obj); --size_; ObjDelete(obj, false); + + // Automatically shrink when utilization drops below 25% + if (entries_.size() > kMinSize && size_ < entries_.size() / 4) { + Shrink(entries_.size() / 2); + } } DenseSet::ChainVectorIterator DenseSet::GetRandomChain() { diff --git a/src/core/dense_set.h b/src/core/dense_set.h index bd8c7898622f..2ef7a8b9a02d 100644 --- a/src/core/dense_set.h +++ b/src/core/dense_set.h @@ -181,6 +181,39 @@ class DenseSet { public: IteratorBase(DenseSet* owner, ChainVectorIterator list_it, DensePtr* e) : owner_(owner), curr_list_(list_it), curr_entry_(e) { + if (owner_) { + ++owner_->num_iterators_; + } + } + + ~IteratorBase() { + if (owner_) { + --owner_->num_iterators_; + } + } + + // Copy constructor - increment iterator count + IteratorBase(const IteratorBase& other) + : owner_(other.owner_), curr_list_(other.curr_list_), curr_entry_(other.curr_entry_) { + if (owner_) { + ++owner_->num_iterators_; + } + } + + // Copy assignment - handle iterator counts + IteratorBase& operator=(const IteratorBase& other) { + if (this != &other) { + if (owner_) { + --owner_->num_iterators_; + } + owner_ = other.owner_; + curr_list_ = other.curr_list_; + curr_entry_ = other.curr_entry_; + if (owner_) { + ++owner_->num_iterators_; + } + } + return *this; } // returns the expiry time of the current entry or UINT32_MAX if no ttl is set. @@ -254,7 +287,10 @@ class DenseSet { using ItemCb = std::function; uint32_t Scan(uint32_t cursor, const ItemCb& cb) const; - void Reserve(size_t sz); + + // Resizes the table to the specified size. Can both grow and shrink. + // The size will be rounded up to the nearest power of 2, with minimum of kMinSize. + void Resize(size_t sz); void Fill(DenseSet* other) const; @@ -367,6 +403,7 @@ class DenseSet { // belong to given bid bool NoItemBelongsBucket(uint32_t bid) const; void Grow(size_t prev_size); + void Shrink(size_t new_size); // ============ Pseudo Linked List Functions for interacting with Chains ================== size_t PushFront(ChainVectorIterator, void* obj, bool has_ttl); @@ -418,6 +455,8 @@ class DenseSet { uint32_t time_now_ = 0; mutable bool expiration_used_ = false; + mutable bool shrinking_ = false; // Guard against recursive shrink + mutable uint32_t num_iterators_ = 0; // Number of active iterators }; inline void* DenseSet::FindInternal(const void* obj, uint64_t hashcode, uint32_t cookie) const { diff --git a/src/core/sorted_map.cc b/src/core/sorted_map.cc index 57b7d56b36bc..1c1dd1e2b3d7 100644 --- a/src/core/sorted_map.cc +++ b/src/core/sorted_map.cc @@ -862,7 +862,7 @@ size_t SortedMap::MallocSize() const { } bool SortedMap::Reserve(size_t sz) { - score_map->Reserve(sz); + score_map->Resize(sz); return true; } diff --git a/src/core/string_set.cc b/src/core/string_set.cc index a104f2cba2fe..895710446a83 100644 --- a/src/core/string_set.cc +++ b/src/core/string_set.cc @@ -60,7 +60,7 @@ unsigned StringSet::AddMany(absl::Span span, uint32_t ttl_sec, std::string_view views[kMaxBatchLen]; unsigned res = 0; if (BucketCount() < span.size()) { - Reserve(span.size()); + Resize(span.size()); } while (span.size() >= kMaxBatchLen) { diff --git a/src/core/string_set_test.cc b/src/core/string_set_test.cc index 3f0403a4da35..a027c4f09750 100644 --- a/src/core/string_set_test.cc +++ b/src/core/string_set_test.cc @@ -487,7 +487,7 @@ TEST_F(StringSetTest, Ttl) { TEST_F(StringSetTest, Grow) { for (size_t j = 0; j < 10; ++j) { for (size_t i = 0; i < 4098; ++i) { - ss_->Reserve(generator_() % 256); + ss_->Resize(generator_() % 256); auto str = random_string(generator_, 3); ss_->Add(str); } @@ -495,7 +495,7 @@ TEST_F(StringSetTest, Grow) { } } -TEST_F(StringSetTest, Reserve) { +TEST_F(StringSetTest, Resize) { vector strs; for (size_t i = 0; i < 10; ++i) { @@ -504,7 +504,7 @@ TEST_F(StringSetTest, Reserve) { } for (size_t j = 2; j < 20; j += 3) { - ss_->Reserve(j * 20); + ss_->Resize(j * 20); for (size_t i = 0; i < 10; ++i) { ASSERT_TRUE(ss_->Contains(strs[i])); } @@ -543,14 +543,14 @@ void BM_Clone(benchmark::State& state) { string str = random_string(generator, 10); ss1.Add(str); } - ss2.Reserve(ss1.UpperBoundSize()); + ss2.Resize(ss1.UpperBoundSize()); while (state.KeepRunning()) { for (auto src : ss1) { ss2.Add(src); } state.PauseTiming(); ss2.Clear(); - ss2.Reserve(ss1.UpperBoundSize()); + ss2.Resize(ss1.UpperBoundSize()); state.ResumeTiming(); } } @@ -601,14 +601,14 @@ void BM_Add(benchmark::State& state) { string str = random_string(generator, keySize); strs.push_back(str); } - ss.Reserve(elems); + ss.Resize(elems); while (state.KeepRunning()) { for (auto& str : strs) ss.Add(str); state.PauseTiming(); state.counters["Memory_Used"] = memUsed(ss); ss.Clear(); - ss.Reserve(elems); + ss.Resize(elems); state.ResumeTiming(); } } @@ -626,7 +626,7 @@ void BM_AddMany(benchmark::State& state) { string str = random_string(generator, keySize); strs.push_back(str); } - ss.Reserve(elems); + ss.Resize(elems); vector svs; for (const auto& str : strs) { svs.push_back(str); @@ -637,7 +637,7 @@ void BM_AddMany(benchmark::State& state) { CHECK_EQ(ss.UpperBoundSize(), elems); state.counters["Memory_Used"] = memUsed(ss); ss.Clear(); - ss.Reserve(elems); + ss.Resize(elems); state.ResumeTiming(); } } @@ -735,7 +735,7 @@ void BM_Spop1000(benchmark::State& state) { state.PauseTiming(); StringSet tmp; src.Fill(&tmp); - tmp.Reserve(elems * sparseness); + tmp.Resize(elems * sparseness); state.ResumeTiming(); for (int i = 0; i < 1000; ++i) { tmp.Pop(); @@ -803,4 +803,148 @@ TEST_F(StringSetTest, TransferTTLFlagLinkToObjectOnDelete) { EXPECT_EQ(1u, it.ExpiryTime()); } +TEST_F(StringSetTest, BasicShrink) { + // Add elements and then grow to have extra capacity + constexpr size_t num_strs = 32; + vector strs; + for (size_t i = 0; i < num_strs; ++i) { + strs.push_back(random_string(generator_, 10)); + EXPECT_TRUE(ss_->Add(strs.back())); + } + + // Grow to a larger size + ss_->Resize(256); + size_t original_bucket_count = ss_->BucketCount(); + EXPECT_EQ(original_bucket_count, 256u); + + // Shrink to half the size using Resize + size_t new_size = original_bucket_count / 2; + ss_->Resize(new_size); + + EXPECT_EQ(ss_->BucketCount(), new_size); + EXPECT_EQ(ss_->UpperBoundSize(), num_strs); + + // Verify all elements are still accessible + for (const auto& str : strs) { + EXPECT_TRUE(ss_->Contains(str)) << "Missing: " << str; + } +} + +TEST_F(StringSetTest, ShrinkGrowRoundtrip) { + // Add elements + constexpr size_t num_strs = 16; + vector strs; + for (size_t i = 0; i < num_strs; ++i) { + strs.push_back(random_string(generator_, 10)); + EXPECT_TRUE(ss_->Add(strs.back())); + } + + // Grow to larger size + ss_->Resize(128); + size_t original_bucket_count = ss_->BucketCount(); + + // Shrink using Resize + size_t shrunk_size = original_bucket_count / 2; + ss_->Resize(shrunk_size); + EXPECT_EQ(ss_->BucketCount(), shrunk_size); + + // Grow back + ss_->Resize(original_bucket_count); + EXPECT_EQ(ss_->BucketCount(), original_bucket_count); + + // Verify all elements are still accessible + for (const auto& str : strs) { + EXPECT_TRUE(ss_->Contains(str)) << "Missing: " << str; + } +} + +TEST_F(StringSetTest, ShrinkWithTTL) { + // Add elements with TTL + constexpr size_t num_strs = 16; + vector strs; + for (size_t i = 0; i < num_strs; ++i) { + strs.push_back(random_string(generator_, 10)); + EXPECT_TRUE(ss_->Add(strs.back(), 100)); // TTL of 100 + } + + // Grow to larger size + ss_->Resize(128); + size_t original_bucket_count = ss_->BucketCount(); + + // Shrink using Resize + size_t new_size = original_bucket_count / 2; + ss_->Resize(new_size); + + EXPECT_EQ(ss_->BucketCount(), new_size); + EXPECT_EQ(ss_->UpperBoundSize(), num_strs); + + // Verify all elements are still accessible with correct TTL + for (const auto& str : strs) { + auto it = ss_->Find(str); + ASSERT_NE(it, ss_->end()) << "Missing: " << str; + EXPECT_TRUE(it.HasExpiry()); + EXPECT_EQ(it.ExpiryTime(), 100); + } +} + +TEST_F(StringSetTest, MultipleShrinks) { + // Add few elements then grow to large size + constexpr size_t num_strs = 8; + vector strs; + for (size_t i = 0; i < num_strs; ++i) { + strs.push_back(random_string(generator_, 10)); + EXPECT_TRUE(ss_->Add(strs.back())); + } + + // Grow to large size + ss_->Resize(256); + size_t bucket_count = ss_->BucketCount(); + + // Shrink multiple times using Resize + while (bucket_count > 8) { // Stop at 8 since we have 8 elements + size_t new_size = bucket_count / 2; + ss_->Resize(new_size); + bucket_count = ss_->BucketCount(); + EXPECT_EQ(bucket_count, new_size); + + // Verify all elements are still accessible after each shrink + for (const auto& str : strs) { + EXPECT_TRUE(ss_->Contains(str)) << "Missing after shrink to " << new_size << ": " << str; + } + } +} + +TEST_F(StringSetTest, ShrinkThenAddMore) { + // Add initial elements + constexpr size_t initial_strs = 16; + vector strs; + for (size_t i = 0; i < initial_strs; ++i) { + strs.push_back(random_string(generator_, 10)); + EXPECT_TRUE(ss_->Add(strs.back())); + } + + // Grow to larger size + ss_->Resize(128); + size_t original_bucket_count = ss_->BucketCount(); + + // Shrink using Resize + size_t new_size = original_bucket_count / 2; + ss_->Resize(new_size); + EXPECT_EQ(ss_->BucketCount(), new_size); + + // Add more elements after shrink + constexpr size_t additional_strs = 32; + for (size_t i = 0; i < additional_strs; ++i) { + strs.push_back(random_string(generator_, 10)); + EXPECT_TRUE(ss_->Add(strs.back())); + } + + EXPECT_EQ(ss_->UpperBoundSize(), initial_strs + additional_strs); + + // Verify all elements are accessible + for (const auto& str : strs) { + EXPECT_TRUE(ss_->Contains(str)) << "Missing: " << str; + } +} + } // namespace dfly diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index a7b45a263512..db39ecf7071a 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -475,7 +475,7 @@ OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList valu } else { DCHECK_EQ(kEncodingStrMap2, pv.Encoding()); // Dictionary StringMap* sm = GetStringMap(pv, op_args.db_cntx); - sm->Reserve(values.size() / 2); + sm->Resize(values.size() / 2); bool added; for (size_t i = 0; i < values.size(); i += 2) { @@ -1047,7 +1047,7 @@ StringMap* HSetFamily::ConvertToStrMap(uint8_t* lp) { StringMap* sm = CompactObj::AllocateMR(); detail::ListpackWrap lw{lp}; - sm->Reserve(lw.size()); + sm->Resize(lw.size()); for (const auto [key, value] : lw) LOG_IF(ERROR, !sm->AddOrUpdate(key, value)) << "Internal error: duplicate key " << key; return sm; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index cfec42a73b45..1041cdef2cbd 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -370,7 +370,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) { inner_obj = set; // Expand the set up front to avoid rehashing. - set->Reserve((config_.reserve > len) ? config_.reserve : len); + set->Resize((config_.reserve > len) ? config_.reserve : len); } size_t increment = 1; @@ -480,7 +480,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) { string_map->set_time(MemberTimeSeconds(GetCurrentTimeMs())); // Expand the map up front to avoid rehashing. - string_map->Reserve((config_.reserve > len) ? config_.reserve : len); + string_map->Resize((config_.reserve > len) ? config_.reserve : len); } auto cleanup = absl::MakeCleanup([&] { diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 94e4ce1c4727..0d63d8c21346 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -96,7 +96,7 @@ struct StringSetWrapper { size_t entries_len = std::visit([](const auto& e) { return e.size(); }, entries); unsigned len = 0; if (ss->BucketCount() < entries_len) { - ss->Reserve(entries_len); + ss->Resize(entries_len); } for (string_view member : EntriesRange(entries)) { members[len++] = member; @@ -1512,7 +1512,7 @@ StringSet* SetFamily::ConvertToStrSet(const intset* is, size_t expected_len) { StringSet* ss = CompactObj::AllocateMR(); if (expected_len) { - ss->Reserve(expected_len); + ss->Resize(expected_len); } while (intsetGet(const_cast(is), ii++, &intele)) {