diff --git a/CMakeLists.txt b/CMakeLists.txt index b9c4835..c1b3da0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -138,6 +138,16 @@ target_link_libraries(milvus-common PUBLIC target_link_libraries(milvus-common PUBLIC OpenMP::OpenMP_CXX) +# thread_pool.cc uses openblas-specific API (openblas_set_num_threads) under OPENBLAS_OS_LINUX +if(NOT APPLE) + find_library(OPENBLAS_LIBRARY NAMES openblas) + if(OPENBLAS_LIBRARY) + target_link_libraries(milvus-common PUBLIC ${OPENBLAS_LIBRARY}) + else() + message(WARNING "OpenBLAS not found, openblas_set_num_threads will be unavailable") + endif() +endif() + if(WITH_COMMON_UT) # Enable unit-test-only helpers for caching layer target_compile_definitions(milvus-common PRIVATE MCL_ENABLE_TEST_CGROUP_OVERRIDE) diff --git a/include/cachinglayer/CacheSlot.h b/include/cachinglayer/CacheSlot.h index cead4f2..f83ac78 100644 --- a/include/cachinglayer/CacheSlot.h +++ b/include/cachinglayer/CacheSlot.h @@ -29,6 +29,7 @@ #include #include +#include "cachinglayer/LoadingOverheadTracker.h" #include "cachinglayer/Metrics.h" #include "cachinglayer/Translator.h" #include "cachinglayer/Utils.h" @@ -54,7 +55,8 @@ class CacheSlot final : public std::enable_shared_from_this> { "representing the memory consumption of the cell"); CacheSlot(std::unique_ptr> translator, internal::DList* dlist, bool evictable, bool self_reserve, - bool storage_usage_tracking_enabled, std::chrono::milliseconds loading_timeout) + bool storage_usage_tracking_enabled, std::chrono::milliseconds loading_timeout, + LoadingOverheadTracker* loading_overhead_tracker = nullptr) : translator_(std::move(translator)), cell_id_mapping_mode_(translator_->meta()->cell_id_mapping_mode), cell_data_type_(translator_->meta()->cell_data_type), @@ -63,7 +65,8 @@ class CacheSlot final : public std::enable_shared_from_this> { evictable_(evictable), self_reserve_(self_reserve), storage_usage_tracking_enabled_(storage_usage_tracking_enabled), - loading_timeout_(loading_timeout) { + loading_timeout_(loading_timeout), + loading_overhead_tracker_(loading_overhead_tracker) { cells_.reserve(translator_->num_cells()); for (cid_t i = 0; i < static_cast(translator_->num_cells()); ++i) { cells_.push_back(std::make_unique(this, i)); @@ -407,9 +410,18 @@ class CacheSlot final : public std::enable_shared_from_this> { void RunLoad(OpContext* ctx, std::unordered_set&& cids, std::chrono::milliseconds timeout) { + // loaded_resource: the estimated final resource usage (from .first), reserved unconditionally. + // loading_resource: the estimated temporary overhead during loading (from .second), + // capped at per-type UB via LoadingOverheadTracker. The tracker returns the incremental + // delta to reserve from DList, so total loading in DList = min(sum, UB) per type. + ResourceUsage essential_loaded_resource{}; ResourceUsage essential_loading_resource{}; + ResourceUsage bonus_loaded_resource{}; ResourceUsage bonus_loading_resource{}; std::vector loading_cids; + // Tracks the loading overhead currently reserved in the tracker but not yet + // guarded by defer_release. Used to clean up tracker state on exception. + ResourceUsage tracker_pending_loading{}; try { auto start = std::chrono::steady_clock::now(); bool reservation_success = false; @@ -439,16 +451,31 @@ class CacheSlot final : public std::enable_shared_from_this> { auto bonus_cids = translator_->bonus_cells_to_be_loaded(loading_cids); for (auto& cid : loading_cids) { - essential_loading_resource += translator_->estimated_byte_size_of_cell(cid).second; + auto [loaded, loading] = translator_->estimated_byte_size_of_cell(cid); + essential_loaded_resource += loaded; + essential_loading_resource += loading; } for (auto& cid : bonus_cids) { - bonus_loading_resource += translator_->estimated_byte_size_of_cell(cid).second; + auto [loaded, loading] = translator_->estimated_byte_size_of_cell(cid); + bonus_loaded_resource += loaded; + bonus_loading_resource += loading; } - auto resource_needed_for_loading = essential_loading_resource + bonus_loading_resource; + // loaded_resource is reserved unconditionally from DList (no capping). + // loading_resource goes through the tracker: returns delta = change in min(sum, UB). + auto loaded_resource = essential_loaded_resource + bonus_loaded_resource; + auto loading_resource = essential_loading_resource + bonus_loading_resource; + + auto loading_delta = loading_resource; + if (loading_overhead_tracker_) { + loading_delta = loading_overhead_tracker_->Reserve(cell_data_type_, loading_resource); + tracker_pending_loading = loading_resource; + } + auto actual_dlist_reserve = loaded_resource + loading_delta; + reservation_success = - SemiInlineGet(dlist_->ReserveLoadingResourceWithTimeout(resource_needed_for_loading, timeout, ctx)); + SemiInlineGet(dlist_->ReserveLoadingResourceWithTimeout(actual_dlist_reserve, timeout, ctx)); if (!bonus_cids.empty()) { // if the reservation failed, try to reserve only the essential loading resource @@ -457,9 +484,22 @@ class CacheSlot final : public std::enable_shared_from_this> { "[MCL] CacheSlot reserve loading resource with bonus cells failed, try to reserve only " "essential " "loading resource"); - resource_needed_for_loading = essential_loading_resource; - reservation_success = SemiInlineGet( - dlist_->ReserveLoadingResourceWithTimeout(resource_needed_for_loading, timeout, ctx)); + // Undo the tracker state for the full reservation + if (loading_overhead_tracker_) { + loading_overhead_tracker_->Release(cell_data_type_, loading_resource); + tracker_pending_loading = {}; + } + loaded_resource = essential_loaded_resource; + loading_resource = essential_loading_resource; + if (loading_overhead_tracker_) { + loading_delta = loading_overhead_tracker_->Reserve(cell_data_type_, loading_resource); + tracker_pending_loading = loading_resource; + } else { + loading_delta = loading_resource; + } + actual_dlist_reserve = loaded_resource + loading_delta; + reservation_success = + SemiInlineGet(dlist_->ReserveLoadingResourceWithTimeout(actual_dlist_reserve, timeout, ctx)); } else { // if the reservation succeeded, we can load the bonus cells loading_cids.insert(loading_cids.end(), bonus_cids.begin(), bonus_cids.end()); @@ -467,56 +507,77 @@ class CacheSlot final : public std::enable_shared_from_this> { } if (!reservation_success) { + // Undo tracker state on failure + if (loading_overhead_tracker_) { + loading_overhead_tracker_->Release(cell_data_type_, loading_resource); + tracker_pending_loading = {}; + } LOG_ERROR( "[MCL] CacheSlot failed to reserve resource for " - "cells: key={}, cell_ids=[{}], total " - "resource_needed_for_loading={}", - translator_->key(), fmt::join(loading_cids, ","), resource_needed_for_loading.ToString()); + "cells: key={}, cell_ids=[{}], " + "loaded_resource={}, loading_resource={}, actual_dlist_reserve={}", + translator_->key(), fmt::join(loading_cids, ","), loaded_resource.ToString(), + loading_resource.ToString(), actual_dlist_reserve.ToString()); ThrowInfo(ErrorCode::InsufficientResource, "[MCL] CacheSlot failed to reserve resource for " - "cells: key={}, cell_ids=[{}], total " - "resource_needed_for_loading={}", - translator_->key(), fmt::join(loading_cids, ","), resource_needed_for_loading.ToString()); + "cells: key={}, cell_ids=[{}], " + "loaded_resource={}, loading_resource={}, actual_dlist_reserve={}", + translator_->key(), fmt::join(loading_cids, ","), loaded_resource.ToString(), + loading_resource.ToString(), actual_dlist_reserve.ToString()); } monitor::cache_loading_bytes(cell_data_type_, StorageType::MEMORY) - .Increment(resource_needed_for_loading.memory_bytes); - monitor::cache_loading_bytes(cell_data_type_, StorageType::DISK) - .Increment(resource_needed_for_loading.file_bytes); + .Increment(actual_dlist_reserve.memory_bytes); + monitor::cache_loading_bytes(cell_data_type_, StorageType::DISK).Increment(actual_dlist_reserve.file_bytes); monitor::cache_cell_loading_count(cell_data_type_, storage_type_).Increment(loading_cids.size()); - // defer release resource_needed_for_loading - auto defer_release = folly::makeGuard([this, &resource_needed_for_loading, &loading_cids]() { + // defer release: loaded_resource + loading delta from tracker. + // Once created, defer_release owns the tracker cleanup — clear tracker_pending_loading. + auto loading_cids_count = loading_cids.size(); + auto defer_release = folly::makeGuard([this, loaded_resource, loading_resource, loading_cids_count]() { try { - dlist_->ReleaseLoadingResource(resource_needed_for_loading); - monitor::cache_cell_loading_count(cell_data_type_, storage_type_).Decrement(loading_cids.size()); + auto release_delta = loading_resource; + if (loading_overhead_tracker_) { + release_delta = loading_overhead_tracker_->Release(cell_data_type_, loading_resource); + } + auto dlist_release = loaded_resource + release_delta; + dlist_->ReleaseLoadingResource(dlist_release); + monitor::cache_cell_loading_count(cell_data_type_, storage_type_).Decrement(loading_cids_count); monitor::cache_loading_bytes(cell_data_type_, StorageType::MEMORY) - .Decrement(resource_needed_for_loading.memory_bytes); + .Decrement(dlist_release.memory_bytes); monitor::cache_loading_bytes(cell_data_type_, StorageType::DISK) - .Decrement(resource_needed_for_loading.file_bytes); + .Decrement(dlist_release.file_bytes); } catch (...) { auto exception = std::current_exception(); auto ew = folly::exception_wrapper(exception); LOG_ERROR( - "[MCL] CacheSlot failed to release loading resource for cells with exception, something must " + "[MCL] CacheSlot failed to release loading resource for cells with exception, something " + "must " "be wrong: " "key={}, " - "loading_cids=[{}], error={}", - translator_->key(), fmt::join(loading_cids, ","), ew.what()); + "loading_cids_count={}, error={}", + translator_->key(), loading_cids_count, ew.what()); } }); + tracker_pending_loading = {}; LOG_TRACE( "[MCL] CacheSlot reserveLoadingResourceWithTimeout {} sec " - "result: {} time: {} sec, resource_needed: {}, key: {}", + "result: {} time: {} sec, loaded_resource: {}, loading_resource: {}, " + "actual_dlist_reserve: {}, key: {}", timeout.count() / 1000.0, reservation_success ? "success" : "failed", std::chrono::duration_cast(std::chrono::steady_clock::now() - start) .count() * 1.0 / 1000, - resource_needed_for_loading.ToString(), translator_->key()); + loaded_resource.ToString(), loading_resource.ToString(), actual_dlist_reserve.ToString(), + translator_->key()); run_load_internal(); } catch (...) { + // Clean up tracker state if not yet taken over by defer_release + if (loading_overhead_tracker_ && tracker_pending_loading.AnyGTZero()) { + loading_overhead_tracker_->Release(cell_data_type_, tracker_pending_loading); + } auto exception = std::current_exception(); auto ew = folly::exception_wrapper(exception); monitor::cache_load_event_fail_total(cell_data_type_, storage_type_).Increment(); @@ -646,6 +707,7 @@ class CacheSlot final : public std::enable_shared_from_this> { const bool self_reserve_; const bool storage_usage_tracking_enabled_; std::chrono::milliseconds loading_timeout_{100000}; + LoadingOverheadTracker* loading_overhead_tracker_{nullptr}; std::atomic warmup_called_{false}; std::atomic skip_pin_{false}; }; diff --git a/include/cachinglayer/LoadingOverheadTracker.h b/include/cachinglayer/LoadingOverheadTracker.h new file mode 100644 index 0000000..b2ecc8e --- /dev/null +++ b/include/cachinglayer/LoadingOverheadTracker.h @@ -0,0 +1,161 @@ +// Copyright (C) 2019-2026 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include + +#include "cachinglayer/Utils.h" +#include "log/Log.h" + +namespace milvus::cachinglayer { + +// Manages per-CellDataType loading overhead reservation with an overhead upper bound (UB). +// +// The total loading overhead reserved from DList for a given type is capped at UB: +// DList loading overhead reservation = min(sum_of_overhead, UB) +// +// Each Reserve/Release call returns the incremental delta to apply to DList. +// The tracker directly tracks `overhead_reserved` (actual amount of overhead currently reserved in DList) to ensure +// correctness even when UB changes at runtime. +// +// The total DList resource reservation across all requests of a type = sum(loaded_i) + min(sum(overhead_i), UB), +// which does not inflate with the number of queuing requests. +// +// By default, types are registered with kUnlimited UB, preserving original behavior (no capping). +class LoadingOverheadTracker { + public: + static inline const ResourceUsage kUnlimited{std::numeric_limits::max(), + std::numeric_limits::max()}; + + // Register the upper bound for a given CellDataType. + // If already registered with a finite UB, uses the larger of the two UBs per dimension. + // If previously registered with kUnlimited (default), replaces with the given UB. + void + RegisterUpperBound(CellDataType type, const ResourceUsage& upper_bound) { + std::lock_guard lock(mtx_); + auto [it, inserted] = type_state_.try_emplace(type, TypeState{upper_bound, {}, {}}); + if (!inserted) { + auto& existing = it->second.upper_bound; + if (existing == kUnlimited) { + existing = upper_bound; + } else { + existing.memory_bytes = std::max(existing.memory_bytes, upper_bound.memory_bytes); + existing.file_bytes = std::max(existing.file_bytes, upper_bound.file_bytes); + } + } + LOG_INFO("[MCL] LoadingOverheadTracker registered UB for type {}: {}", static_cast(type), + upper_bound.ToString()); + } + + // Ensure a type is registered. If not yet registered, registers with kUnlimited. + void + EnsureRegistered(CellDataType type) { + std::lock_guard lock(mtx_); + type_state_.try_emplace(type, TypeState{kUnlimited, {}, {}}); + } + + // Called before loading. Returns the delta to reserve from DList for loading overhead. + // Caller should: DList.Reserve(loaded_resource + delta) + ResourceUsage + Reserve(CellDataType type, const ResourceUsage& loading_overhead) { + std::lock_guard lock(mtx_); + auto& state = getOrCreateState(type); + state.sum_of_overhead += loading_overhead; + auto target = cappedAmount(state.sum_of_overhead, state.upper_bound); + auto delta = target - state.overhead_reserved; + delta.memory_bytes = std::max(delta.memory_bytes, int64_t{0}); + delta.file_bytes = std::max(delta.file_bytes, int64_t{0}); + state.overhead_reserved += delta; + LOG_TRACE( + "[MCL] LoadingOverheadTracker Reserve type={}: loading={}, " + "sum={}, UB={}, overhead_reserved={}, delta={}", + static_cast(type), loading_overhead.ToString(), state.sum_of_overhead.ToString(), + state.upper_bound.ToString(), state.overhead_reserved.ToString(), delta.ToString()); + return delta; + } + + // Called to release a previous Reserve (either after loading completes, or to undo + // a Reserve when DList reservation fails / bonus cells retry). + // Returns the delta to release from DList for loading overhead. + ResourceUsage + Release(CellDataType type, const ResourceUsage& loading_overhead) { + std::lock_guard lock(mtx_); + auto& state = getOrCreateState(type); + state.sum_of_overhead -= loading_overhead; + if (state.sum_of_overhead.memory_bytes < 0) { + LOG_ERROR("[MCL] LoadingOverheadTracker ReleaseInternal type={}: sum_of_overhead.memory_bytes < 0", + static_cast(type)); + state.sum_of_overhead.memory_bytes = 0; + } + if (state.sum_of_overhead.file_bytes < 0) { + LOG_ERROR("[MCL] LoadingOverheadTracker ReleaseInternal type={}: sum_of_overhead.file_bytes < 0", + static_cast(type)); + state.sum_of_overhead.file_bytes = 0; + } + auto target = cappedAmount(state.sum_of_overhead, state.upper_bound); + auto delta = state.overhead_reserved - target; + delta.memory_bytes = std::max(delta.memory_bytes, int64_t{0}); + delta.file_bytes = std::max(delta.file_bytes, int64_t{0}); + state.overhead_reserved -= delta; + LOG_TRACE( + "[MCL] LoadingOverheadTracker Release type={}: loading={}, " + "sum={}, UB={}, overhead_reserved={}, delta={}", + static_cast(type), loading_overhead.ToString(), state.sum_of_overhead.ToString(), + state.upper_bound.ToString(), state.overhead_reserved.ToString(), delta.ToString()); + return delta; + } + + // Check if a type has a finite (non-unlimited) upper bound registered. + bool + HasFiniteUpperBound(CellDataType type) const { + std::lock_guard lock(mtx_); + auto it = type_state_.find(type); + return it != type_state_.end() && !(it->second.upper_bound == kUnlimited); + } + + // Get the upper bound for a type. + ResourceUsage + GetUpperBound(CellDataType type) const { + std::lock_guard lock(mtx_); + auto it = type_state_.find(type); + if (it == type_state_.end()) { + return kUnlimited; + } + return it->second.upper_bound; + } + + private: + struct TypeState { + ResourceUsage upper_bound; + ResourceUsage sum_of_overhead; // total loading overhead requested (uncapped) + ResourceUsage overhead_reserved; // actual amount currently reserved in DList + }; + + static ResourceUsage + cappedAmount(const ResourceUsage& sum, const ResourceUsage& ub) { + return {std::min(std::max(sum.memory_bytes, int64_t{0}), ub.memory_bytes), + std::min(std::max(sum.file_bytes, int64_t{0}), ub.file_bytes)}; + } + + TypeState& + getOrCreateState(CellDataType type) { + auto [it, _] = type_state_.try_emplace(type, TypeState{kUnlimited, {}, {}}); + return it->second; + } + + mutable std::mutex mtx_; + std::unordered_map type_state_; +}; + +} // namespace milvus::cachinglayer diff --git a/include/cachinglayer/Manager.h b/include/cachinglayer/Manager.h index 793d8bc..d4c4da9 100644 --- a/include/cachinglayer/Manager.h +++ b/include/cachinglayer/Manager.h @@ -16,6 +16,7 @@ #include #include "cachinglayer/CacheSlot.h" +#include "cachinglayer/LoadingOverheadTracker.h" #include "cachinglayer/Translator.h" #include "cachinglayer/Utils.h" #include "cachinglayer/lrucache/DList.h" @@ -55,9 +56,20 @@ class Manager { } auto evictable = translator->meta()->support_eviction && eviction_enabled_; auto self_reserve = eviction_enabled_; - auto cache_slot = - std::make_shared>(std::move(translator), dlist_.get(), evictable, self_reserve, - storage_usage_tracking_enabled_, loading_timeout_); + + // Register loading overhead upper bound for this CellDataType. + // If the translator specifies a finite UB, use it; otherwise register + // with unlimited UB to ensure every type goes through the tracker. + if (translator->meta()->loading_overhead_upper_bound.has_value()) { + loading_overhead_tracker_.RegisterUpperBound(translator->meta()->cell_data_type, + translator->meta()->loading_overhead_upper_bound.value()); + } else { + loading_overhead_tracker_.EnsureRegistered(translator->meta()->cell_data_type); + } + + auto cache_slot = std::make_shared>(std::move(translator), dlist_.get(), evictable, + self_reserve, storage_usage_tracking_enabled_, + loading_timeout_, &loading_overhead_tracker_); cache_slot->Warmup(ctx, prefetch_pool_); return cache_slot; } @@ -137,6 +149,7 @@ class Manager { std::shared_ptr dlist_{nullptr}; std::shared_ptr prefetch_pool_{nullptr}; + LoadingOverheadTracker loading_overhead_tracker_; CacheWarmupPolicies warmup_policies_{}; bool storage_usage_tracking_enabled_{false}; bool eviction_enabled_{false}; diff --git a/include/cachinglayer/Translator.h b/include/cachinglayer/Translator.h index f20e68c..1316a4f 100644 --- a/include/cachinglayer/Translator.h +++ b/include/cachinglayer/Translator.h @@ -12,6 +12,7 @@ #pragma once #include +#include #include #include @@ -31,13 +32,21 @@ struct Meta { // Whether the translator supports strategy based eviction. // Does not affect manual eviction. bool support_eviction; + // Upper bound for loading overhead reservation for this CellDataType. + // When set, the total loading overhead across all CacheSlots of this type + // is capped at this value, preventing over-reservation when many concurrent + // loads happen. The real resource usage is bounded by loading_pool_size * cell_size. + // If not set, no capping is applied (existing behavior). + std::optional loading_overhead_upper_bound; explicit Meta(StorageType storage_type, CellIdMappingMode cell_id_mapping_mode, CellDataType cell_data_type, - CacheWarmupPolicy cache_warmup_policy, bool support_eviction) + CacheWarmupPolicy cache_warmup_policy, bool support_eviction, + std::optional loading_overhead_upper_bound = std::nullopt) : storage_type(storage_type), cell_id_mapping_mode(cell_id_mapping_mode), cell_data_type(cell_data_type), cache_warmup_policy(cache_warmup_policy), - support_eviction(support_eviction) { + support_eviction(support_eviction), + loading_overhead_upper_bound(loading_overhead_upper_bound) { } }; @@ -51,9 +60,13 @@ class Translator { virtual cid_t cell_id_of(uid_t uid) const = 0; // For resource reservation when a cell is about to be loaded. - // There are two types of resource usage for a cell: the first is the usage after it has been loaded, - // and the second is the usage during loading. Typically, the loading usage is greater than the loaded usage - // due to the preprocessing stage. + // Returns {loaded_usage, loading_overhead}: + // - loaded_usage (first): the final resource usage after the cell is fully loaded and in cache. + // - loading_overhead (second): the *temporary* resource usage during loading (e.g., preprocessing buffers), + // excluding the final loaded usage. This is the extra overhead that only exists during the loading phase. + // When loading_overhead_upper_bound is set in Meta, the total loading reservation across all CacheSlots + // of the same CellDataType is capped at that upper bound, since actual concurrent resource usage is + // bounded by loading_pool_size * cell_size. // If a cell is about to be pinned and loaded, and there are not enough resource for it, EvictionManager // will try to evict some other cells to make space. Thus this estimation should generally be greater // than or equal to the actual size. If the estimation is smaller than the actual size, with insufficient diff --git a/test/test_cachinglayer/CMakeLists.txt b/test/test_cachinglayer/CMakeLists.txt index 75794e7..1a51ee9 100644 --- a/test/test_cachinglayer/CMakeLists.txt +++ b/test/test_cachinglayer/CMakeLists.txt @@ -9,7 +9,6 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under the License - find_package(GTest CONFIG REQUIRED) find_package(BLAS REQUIRED) @@ -17,6 +16,7 @@ set(CACHINGLAYER_TEST_FILES ../init_gtest.cpp test_dlist.cpp test_cache_slot.cpp + test_loading_overhead_tracker.cpp test_utils.cpp ) @@ -24,7 +24,6 @@ add_executable(cachinglayer_test ${CACHINGLAYER_TEST_FILES} ) - target_link_libraries(cachinglayer_test GTest::gtest GTest::gmock diff --git a/test/test_cachinglayer/test_cache_slot.cpp b/test/test_cachinglayer/test_cache_slot.cpp index b55f052..b12a745 100644 --- a/test/test_cachinglayer/test_cache_slot.cpp +++ b/test/test_cachinglayer/test_cache_slot.cpp @@ -84,9 +84,9 @@ class MockTranslator : public Translator { estimated_byte_size_of_cell(cid_t cid) const override { auto it = cell_sizes_.find(cid); if (it != cell_sizes_.end()) { - return {{it->second, 0}, {it->second, 0}}; + return {{it->second, 0}, {loading_overhead_bytes_, 0}}; } - return {{1, 0}, {1, 0}}; + return {{1, 0}, {loading_overhead_bytes_, 0}}; } int64_t @@ -179,6 +179,10 @@ class MockTranslator : public Translator { SetExtraReturnCids(std::unordered_map> extra_cids) { extra_cids_ = extra_cids; } + void + SetLoadingOverheadBytes(int64_t bytes) { + loading_overhead_bytes_ = bytes; + } int GetCellsCallCount() const { EXPECT_FALSE(for_concurrent_test_); @@ -210,6 +214,7 @@ class MockTranslator : public Translator { std::unordered_map> extra_cids_; std::atomic get_cells_call_count_ = 0; std::vector> requested_cids_; + int64_t loading_overhead_bytes_ = 0; // this class is not concurrent safe, so if for concurrent test, do not track usage bool for_concurrent_test_ = false; @@ -1022,9 +1027,9 @@ class MockTranslatorWithWarmup : public Translator { estimated_byte_size_of_cell(cid_t cid) const override { auto it = cell_sizes_.find(cid); if (it != cell_sizes_.end()) { - return {{it->second, 0}, {it->second, 0}}; + return {{it->second, 0}, {0, 0}}; } - return {{1, 0}, {1, 0}}; + return {{1, 0}, {0, 0}}; } int64_t @@ -1682,3 +1687,75 @@ TEST(AsyncWarmupTest, DisabledWarmupStillWorks) { prefetch_pool->join(); } + +// Test that CacheSlot correctly integrates with LoadingOverheadTracker when +// the translator reports non-zero loading overhead. +TEST(CacheSlotTrackerTest, LoadingOverheadTrackerIntegration) { + ResourceUsage limit{10000, 0}; + auto dlist = std::make_shared(true, limit, limit, limit, EvictionConfig{10, true, 600}); + + LoadingOverheadTracker tracker; + tracker.RegisterUpperBound(CellDataType::OTHER, {500, 0}); + + const int64_t cell_loaded_size = 100; + const int64_t cell_loading_overhead = 200; + + auto translator = std::make_unique( + std::vector>{{0, cell_loaded_size}, {1, cell_loaded_size}}, + std::unordered_map{{0, 0}, {1, 1}}, "test_tracker_integration", StorageType::MEMORY); + translator->SetLoadingOverheadBytes(cell_loading_overhead); + auto* translator_ptr = translator.get(); + + auto cache_slot = std::make_shared>(std::move(translator), dlist.get(), true, true, false, + std::chrono::milliseconds(5000), &tracker); + + auto op_ctx = std::make_unique(); + + // Pin cell 0: should reserve loaded(100) + overhead delta from tracker + auto accessor = cache_slot->PinCellsDirect(op_ctx.get(), {0}); + ASSERT_NE(accessor, nullptr); + EXPECT_EQ(accessor->get_cell_of(0)->data, 0); + + // Pin cell 1: should also go through tracker + auto accessor2 = cache_slot->PinCellsDirect(op_ctx.get(), {1}); + ASSERT_NE(accessor2, nullptr); + EXPECT_EQ(accessor2->get_cell_of(1)->data, 10); + + EXPECT_EQ(translator_ptr->GetCellsCallCount(), 2); +} + +// Test that tracker state is properly cleaned up when load throws an exception. +TEST(CacheSlotTrackerTest, LoadingOverheadTrackerCleanupOnException) { + ResourceUsage limit{10000, 0}; + auto dlist = std::make_shared(true, limit, limit, limit, EvictionConfig{10, true, 600}); + + LoadingOverheadTracker tracker; + tracker.RegisterUpperBound(CellDataType::OTHER, {500, 0}); + + const int64_t cell_loaded_size = 100; + const int64_t cell_loading_overhead = 200; + + auto translator = std::make_unique(std::vector>{{0, cell_loaded_size}}, + std::unordered_map{{0, 0}}, + "test_tracker_exception", StorageType::MEMORY); + translator->SetLoadingOverheadBytes(cell_loading_overhead); + translator->SetShouldThrow(true); + + auto cache_slot = std::make_shared>(std::move(translator), dlist.get(), true, true, false, + std::chrono::milliseconds(5000), &tracker); + + auto op_ctx = std::make_unique(); + + // Pin should fail because translator throws, but tracker state must be cleaned up. + // The exception is caught internally by RunLoad and set as error on the cell. + EXPECT_ANY_THROW(cache_slot->PinCellsDirect(op_ctx.get(), {0})); + + // After the failed load, tracker state should be clean. + // Verify by reserving again — if state leaked, the tracker would have stale sum_of_overhead. + // Reserve the full UB amount — should succeed fully if no leak. + auto delta = tracker.Reserve(CellDataType::OTHER, {500, 0}); + EXPECT_EQ(delta.memory_bytes, 500); + + auto release = tracker.Release(CellDataType::OTHER, {500, 0}); + EXPECT_EQ(release.memory_bytes, 500); +} diff --git a/test/test_cachinglayer/test_loading_overhead_tracker.cpp b/test/test_cachinglayer/test_loading_overhead_tracker.cpp new file mode 100644 index 0000000..3a91d86 --- /dev/null +++ b/test/test_cachinglayer/test_loading_overhead_tracker.cpp @@ -0,0 +1,299 @@ +#include + +#include +#include + +#include "cachinglayer/LoadingOverheadTracker.h" +#include "cachinglayer/Utils.h" + +using namespace milvus::cachinglayer; + +class LoadingOverheadTrackerTest : public ::testing::Test { + protected: + LoadingOverheadTracker tracker_; +}; + +TEST_F(LoadingOverheadTrackerTest, NoUpperBoundPassThrough) { + // Without registering a UB, all amounts pass through unchanged. + auto delta = tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(delta.memory_bytes, 100); + EXPECT_EQ(delta.file_bytes, 0); + + auto release = tracker_.Release(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(release.memory_bytes, 100); + EXPECT_EQ(release.file_bytes, 0); +} + +TEST_F(LoadingOverheadTrackerTest, BasicCapping) { + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 0}); + + // First reserve: 100, sum=100 <= UB=200, full amount passes through + auto d1 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(d1.memory_bytes, 100); + + // Second reserve: 100, sum=200 <= UB=200, full amount passes through + auto d2 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(d2.memory_bytes, 100); + + // Third reserve: 100, sum=300 > UB=200, capped: delta = 0 + auto d3 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(d3.memory_bytes, 0); + + // Fourth reserve: 100, sum=400 > UB=200, capped: delta = 0 + auto d4 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(d4.memory_bytes, 0); +} + +TEST_F(LoadingOverheadTrackerTest, BasicRelease) { + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 0}); + + // Reserve 4x100, total sum=400, actual DList reserve = 200 + tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + + // Release first 100: sum 400->300, both >= UB, release 0 + auto r1 = tracker_.Release(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(r1.memory_bytes, 0); + + // Release second 100: sum 300->200, release 0 + auto r2 = tracker_.Release(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(r2.memory_bytes, 0); + + // Release third 100: sum 200->100, release 100 + auto r3 = tracker_.Release(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(r3.memory_bytes, 100); + + // Release fourth 100: sum 100->0, release 100 + auto r4 = tracker_.Release(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(r4.memory_bytes, 100); +} + +TEST_F(LoadingOverheadTrackerTest, TotalReservedEqualsReleased) { + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 0}); + + int64_t total_reserved = 0; + int64_t total_released = 0; + + // Reserve 10 x 100 + for (int i = 0; i < 10; i++) { + total_reserved += tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}).memory_bytes; + } + // Should have reserved exactly UB = 200 + EXPECT_EQ(total_reserved, 200); + + // Release all 10 + for (int i = 0; i < 10; i++) { + total_released += tracker_.Release(CellDataType::VECTOR_INDEX, {100, 0}).memory_bytes; + } + // Total released should equal total reserved + EXPECT_EQ(total_released, 200); +} + +TEST_F(LoadingOverheadTrackerTest, PartialCapping) { + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 0}); + + // Reserve 150: sum=150 <= UB=200, full amount + auto d1 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {150, 0}); + EXPECT_EQ(d1.memory_bytes, 150); + + // Reserve 100: sum=250 > UB=200, delta = 200-150 = 50 + auto d2 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(d2.memory_bytes, 50); +} + +TEST_F(LoadingOverheadTrackerTest, ReleaseUndoesReserve) { + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 0}); + + // Reserve 150: actual = 150 + auto d1 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {150, 0}); + EXPECT_EQ(d1.memory_bytes, 150); + + // Release undoes the reserve: sum 150->0, release = 150 + auto undo = tracker_.Release(CellDataType::VECTOR_INDEX, {150, 0}); + EXPECT_EQ(undo.memory_bytes, 150); +} + +TEST_F(LoadingOverheadTrackerTest, MultipleTypes) { + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 0}); + tracker_.RegisterUpperBound(CellDataType::SCALAR_INDEX, {100, 0}); + + // Types are tracked independently + auto d1 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {200, 0}); + EXPECT_EQ(d1.memory_bytes, 200); + + auto d2 = tracker_.Reserve(CellDataType::SCALAR_INDEX, {100, 0}); + EXPECT_EQ(d2.memory_bytes, 100); + + // Both at UB, further reserves return 0 + auto d3 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(d3.memory_bytes, 0); + + auto d4 = tracker_.Reserve(CellDataType::SCALAR_INDEX, {50, 0}); + EXPECT_EQ(d4.memory_bytes, 0); +} + +TEST_F(LoadingOverheadTrackerTest, RegisterUpperBoundTakesMax) { + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {100, 50}); + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 30}); + + // UB should be {200, 50} (max per dimension) + auto ub = tracker_.GetUpperBound(CellDataType::VECTOR_INDEX); + EXPECT_EQ(ub.memory_bytes, 200); + EXPECT_EQ(ub.file_bytes, 50); + + auto d1 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {200, 50}); + EXPECT_EQ(d1.memory_bytes, 200); + EXPECT_EQ(d1.file_bytes, 50); + + // Next reserve should be fully capped + auto d2 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 100}); + EXPECT_EQ(d2.memory_bytes, 0); + EXPECT_EQ(d2.file_bytes, 0); +} + +TEST_F(LoadingOverheadTrackerTest, HasFiniteUpperBound) { + EXPECT_FALSE(tracker_.HasFiniteUpperBound(CellDataType::VECTOR_INDEX)); + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 0}); + EXPECT_TRUE(tracker_.HasFiniteUpperBound(CellDataType::VECTOR_INDEX)); + EXPECT_FALSE(tracker_.HasFiniteUpperBound(CellDataType::SCALAR_INDEX)); +} + +TEST_F(LoadingOverheadTrackerTest, ConcurrentReserveRelease) { + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 0}); + + const int num_threads = 10; + const int ops_per_thread = 100; + std::vector threads; + std::atomic total_reserved{0}; + std::atomic total_released{0}; + + threads.reserve(num_threads); + for (int i = 0; i < num_threads; i++) { + threads.emplace_back([&]() { + for (int j = 0; j < ops_per_thread; j++) { + auto reserved = tracker_.Reserve(CellDataType::VECTOR_INDEX, {10, 0}); + total_reserved += reserved.memory_bytes; + } + for (int j = 0; j < ops_per_thread; j++) { + auto released = tracker_.Release(CellDataType::VECTOR_INDEX, {10, 0}); + total_released += released.memory_bytes; + } + }); + } + + for (auto& t : threads) { + t.join(); + } + + // Total reserved must equal total released + EXPECT_EQ(total_reserved.load(), total_released.load()); +} + +TEST_F(LoadingOverheadTrackerTest, DefaultUnlimitedUBFallback) { + // EnsureRegistered without explicit UB -> unlimited, behaves like no capping. + tracker_.EnsureRegistered(CellDataType::VECTOR_INDEX); + + EXPECT_FALSE(tracker_.HasFiniteUpperBound(CellDataType::VECTOR_INDEX)); + + auto d1 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {1000000000, 0}); + EXPECT_EQ(d1.memory_bytes, 1000000000); + + auto d2 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {2000000000, 0}); + EXPECT_EQ(d2.memory_bytes, 2000000000); + + auto r1 = tracker_.Release(CellDataType::VECTOR_INDEX, {1000000000, 0}); + EXPECT_EQ(r1.memory_bytes, 1000000000); + + auto r2 = tracker_.Release(CellDataType::VECTOR_INDEX, {2000000000, 0}); + EXPECT_EQ(r2.memory_bytes, 2000000000); +} + +TEST_F(LoadingOverheadTrackerTest, EnsureRegisteredThenRegisterFiniteUB) { + tracker_.EnsureRegistered(CellDataType::VECTOR_INDEX); + EXPECT_FALSE(tracker_.HasFiniteUpperBound(CellDataType::VECTOR_INDEX)); + + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 0}); + EXPECT_TRUE(tracker_.HasFiniteUpperBound(CellDataType::VECTOR_INDEX)); + + // Now capping should apply. + auto d1 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {300, 0}); + EXPECT_EQ(d1.memory_bytes, 200); +} + +TEST_F(LoadingOverheadTrackerTest, UnregisteredTypeAutoCreatesUnlimited) { + auto d1 = tracker_.Reserve(CellDataType::SCALAR_FIELD, {500, 0}); + EXPECT_EQ(d1.memory_bytes, 500); + + auto r1 = tracker_.Release(CellDataType::SCALAR_FIELD, {500, 0}); + EXPECT_EQ(r1.memory_bytes, 500); + + EXPECT_FALSE(tracker_.HasFiniteUpperBound(CellDataType::SCALAR_FIELD)); +} + +TEST_F(LoadingOverheadTrackerTest, UBChangesMidFlight) { + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 0}); + + // Reserve 3x100 under UB=200: dlist gets 100+100+0 = 200 + auto d1 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(d1.memory_bytes, 100); + auto d2 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(d2.memory_bytes, 100); + auto d3 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(d3.memory_bytes, 0); + + int64_t total_reserved = d1.memory_bytes + d2.memory_bytes + d3.memory_bytes; + EXPECT_EQ(total_reserved, 200); + + // UB changes to 400 + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {400, 0}); + + // Release all 3: should release exactly 200 total + auto r1 = tracker_.Release(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(r1.memory_bytes, 0); + auto r2 = tracker_.Release(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(r2.memory_bytes, 100); + auto r3 = tracker_.Release(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(r3.memory_bytes, 100); + + int64_t total_released = r1.memory_bytes + r2.memory_bytes + r3.memory_bytes; + EXPECT_EQ(total_released, 200); + EXPECT_EQ(total_reserved, total_released); +} + +TEST_F(LoadingOverheadTrackerTest, UBDecreasesFromUnlimitedMidFlight) { + auto d1 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {1000, 0}); + EXPECT_EQ(d1.memory_bytes, 1000); + auto d2 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {1000, 0}); + EXPECT_EQ(d2.memory_bytes, 1000); + + // Now register finite UB=200 + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 0}); + + // Further reserves should be capped + auto d3 = tracker_.Reserve(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(d3.memory_bytes, 0); + + // Release all 3: should release exactly 2000 + auto r1 = tracker_.Release(CellDataType::VECTOR_INDEX, {1000, 0}); + EXPECT_EQ(r1.memory_bytes, 1800); + auto r2 = tracker_.Release(CellDataType::VECTOR_INDEX, {1000, 0}); + EXPECT_EQ(r2.memory_bytes, 100); + auto r3 = tracker_.Release(CellDataType::VECTOR_INDEX, {100, 0}); + EXPECT_EQ(r3.memory_bytes, 100); + + int64_t total_reserved = d1.memory_bytes + d2.memory_bytes + d3.memory_bytes; + int64_t total_released = r1.memory_bytes + r2.memory_bytes + r3.memory_bytes; + EXPECT_EQ(total_reserved, 2000); + EXPECT_EQ(total_released, 2000); +} + +TEST_F(LoadingOverheadTrackerTest, GetUpperBound) { + EXPECT_EQ(tracker_.GetUpperBound(CellDataType::VECTOR_INDEX), LoadingOverheadTracker::kUnlimited); + + tracker_.RegisterUpperBound(CellDataType::VECTOR_INDEX, {200, 100}); + auto ub = tracker_.GetUpperBound(CellDataType::VECTOR_INDEX); + EXPECT_EQ(ub.memory_bytes, 200); + EXPECT_EQ(ub.file_bytes, 100); +}