Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ target_link_libraries(milvus-common PUBLIC

target_link_libraries(milvus-common PUBLIC OpenMP::OpenMP_CXX)

# thread_pool.cc uses openblas-specific API (openblas_set_num_threads) under OPENBLAS_OS_LINUX
if(NOT APPLE)
find_library(OPENBLAS_LIBRARY NAMES openblas)
if(OPENBLAS_LIBRARY)
target_link_libraries(milvus-common PUBLIC ${OPENBLAS_LIBRARY})
else()
message(WARNING "OpenBLAS not found, openblas_set_num_threads will be unavailable")
endif()
endif()

if(WITH_COMMON_UT)
# Enable unit-test-only helpers for caching layer
target_compile_definitions(milvus-common PRIVATE MCL_ENABLE_TEST_CGROUP_OVERRIDE)
Expand Down
120 changes: 91 additions & 29 deletions include/cachinglayer/CacheSlot.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <utility>
#include <vector>

#include "cachinglayer/LoadingOverheadTracker.h"
#include "cachinglayer/Metrics.h"
#include "cachinglayer/Translator.h"
#include "cachinglayer/Utils.h"
Expand All @@ -54,7 +55,8 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
"representing the memory consumption of the cell");

CacheSlot(std::unique_ptr<Translator<CellT>> translator, internal::DList* dlist, bool evictable, bool self_reserve,
bool storage_usage_tracking_enabled, std::chrono::milliseconds loading_timeout)
bool storage_usage_tracking_enabled, std::chrono::milliseconds loading_timeout,
LoadingOverheadTracker* loading_overhead_tracker = nullptr)
: translator_(std::move(translator)),
cell_id_mapping_mode_(translator_->meta()->cell_id_mapping_mode),
cell_data_type_(translator_->meta()->cell_data_type),
Expand All @@ -63,7 +65,8 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
evictable_(evictable),
self_reserve_(self_reserve),
storage_usage_tracking_enabled_(storage_usage_tracking_enabled),
loading_timeout_(loading_timeout) {
loading_timeout_(loading_timeout),
loading_overhead_tracker_(loading_overhead_tracker) {
cells_.reserve(translator_->num_cells());
for (cid_t i = 0; i < static_cast<cid_t>(translator_->num_cells()); ++i) {
cells_.push_back(std::make_unique<CacheCell>(this, i));
Expand Down Expand Up @@ -407,9 +410,18 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {

void
RunLoad(OpContext* ctx, std::unordered_set<cid_t>&& cids, std::chrono::milliseconds timeout) {
// loaded_resource: the estimated final resource usage (from .first), reserved unconditionally.
// loading_resource: the estimated temporary overhead during loading (from .second),
// capped at per-type UB via LoadingOverheadTracker. The tracker returns the incremental
// delta to reserve from DList, so total loading in DList = min(sum, UB) per type.
ResourceUsage essential_loaded_resource{};
ResourceUsage essential_loading_resource{};
ResourceUsage bonus_loaded_resource{};
ResourceUsage bonus_loading_resource{};
std::vector<cid_t> loading_cids;
// Tracks the loading overhead currently reserved in the tracker but not yet
// guarded by defer_release. Used to clean up tracker state on exception.
ResourceUsage tracker_pending_loading{};
try {
auto start = std::chrono::steady_clock::now();
bool reservation_success = false;
Expand Down Expand Up @@ -439,16 +451,31 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
auto bonus_cids = translator_->bonus_cells_to_be_loaded(loading_cids);

for (auto& cid : loading_cids) {
essential_loading_resource += translator_->estimated_byte_size_of_cell(cid).second;
auto [loaded, loading] = translator_->estimated_byte_size_of_cell(cid);
essential_loaded_resource += loaded;
essential_loading_resource += loading;
}

for (auto& cid : bonus_cids) {
bonus_loading_resource += translator_->estimated_byte_size_of_cell(cid).second;
auto [loaded, loading] = translator_->estimated_byte_size_of_cell(cid);
bonus_loaded_resource += loaded;
bonus_loading_resource += loading;
}

auto resource_needed_for_loading = essential_loading_resource + bonus_loading_resource;
// loaded_resource is reserved unconditionally from DList (no capping).
// loading_resource goes through the tracker: returns delta = change in min(sum, UB).
auto loaded_resource = essential_loaded_resource + bonus_loaded_resource;
auto loading_resource = essential_loading_resource + bonus_loading_resource;

auto loading_delta = loading_resource;
if (loading_overhead_tracker_) {
loading_delta = loading_overhead_tracker_->Reserve(cell_data_type_, loading_resource);
tracker_pending_loading = loading_resource;
}
auto actual_dlist_reserve = loaded_resource + loading_delta;

reservation_success =
SemiInlineGet(dlist_->ReserveLoadingResourceWithTimeout(resource_needed_for_loading, timeout, ctx));
SemiInlineGet(dlist_->ReserveLoadingResourceWithTimeout(actual_dlist_reserve, timeout, ctx));

if (!bonus_cids.empty()) {
// if the reservation failed, try to reserve only the essential loading resource
Expand All @@ -457,66 +484,100 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
"[MCL] CacheSlot reserve loading resource with bonus cells failed, try to reserve only "
"essential "
"loading resource");
resource_needed_for_loading = essential_loading_resource;
reservation_success = SemiInlineGet(
dlist_->ReserveLoadingResourceWithTimeout(resource_needed_for_loading, timeout, ctx));
// Undo the tracker state for the full reservation
if (loading_overhead_tracker_) {
loading_overhead_tracker_->Release(cell_data_type_, loading_resource);
tracker_pending_loading = {};
}
loaded_resource = essential_loaded_resource;
loading_resource = essential_loading_resource;
if (loading_overhead_tracker_) {
loading_delta = loading_overhead_tracker_->Reserve(cell_data_type_, loading_resource);
tracker_pending_loading = loading_resource;
} else {
loading_delta = loading_resource;
}
actual_dlist_reserve = loaded_resource + loading_delta;
reservation_success =
SemiInlineGet(dlist_->ReserveLoadingResourceWithTimeout(actual_dlist_reserve, timeout, ctx));
} else {
// if the reservation succeeded, we can load the bonus cells
loading_cids.insert(loading_cids.end(), bonus_cids.begin(), bonus_cids.end());
}
}

if (!reservation_success) {
// Undo tracker state on failure
if (loading_overhead_tracker_) {
loading_overhead_tracker_->Release(cell_data_type_, loading_resource);
tracker_pending_loading = {};
}
LOG_ERROR(
"[MCL] CacheSlot failed to reserve resource for "
"cells: key={}, cell_ids=[{}], total "
"resource_needed_for_loading={}",
translator_->key(), fmt::join(loading_cids, ","), resource_needed_for_loading.ToString());
"cells: key={}, cell_ids=[{}], "
"loaded_resource={}, loading_resource={}, actual_dlist_reserve={}",
translator_->key(), fmt::join(loading_cids, ","), loaded_resource.ToString(),
loading_resource.ToString(), actual_dlist_reserve.ToString());
ThrowInfo(ErrorCode::InsufficientResource,
"[MCL] CacheSlot failed to reserve resource for "
"cells: key={}, cell_ids=[{}], total "
"resource_needed_for_loading={}",
translator_->key(), fmt::join(loading_cids, ","), resource_needed_for_loading.ToString());
"cells: key={}, cell_ids=[{}], "
"loaded_resource={}, loading_resource={}, actual_dlist_reserve={}",
translator_->key(), fmt::join(loading_cids, ","), loaded_resource.ToString(),
loading_resource.ToString(), actual_dlist_reserve.ToString());
}

monitor::cache_loading_bytes(cell_data_type_, StorageType::MEMORY)
.Increment(resource_needed_for_loading.memory_bytes);
monitor::cache_loading_bytes(cell_data_type_, StorageType::DISK)
.Increment(resource_needed_for_loading.file_bytes);
.Increment(actual_dlist_reserve.memory_bytes);
monitor::cache_loading_bytes(cell_data_type_, StorageType::DISK).Increment(actual_dlist_reserve.file_bytes);
monitor::cache_cell_loading_count(cell_data_type_, storage_type_).Increment(loading_cids.size());

// defer release resource_needed_for_loading
auto defer_release = folly::makeGuard([this, &resource_needed_for_loading, &loading_cids]() {
// defer release: loaded_resource + loading delta from tracker.
// Once created, defer_release owns the tracker cleanup — clear tracker_pending_loading.
auto loading_cids_count = loading_cids.size();
auto defer_release = folly::makeGuard([this, loaded_resource, loading_resource, loading_cids_count]() {
try {
dlist_->ReleaseLoadingResource(resource_needed_for_loading);
monitor::cache_cell_loading_count(cell_data_type_, storage_type_).Decrement(loading_cids.size());
auto release_delta = loading_resource;
if (loading_overhead_tracker_) {
release_delta = loading_overhead_tracker_->Release(cell_data_type_, loading_resource);
}
auto dlist_release = loaded_resource + release_delta;
dlist_->ReleaseLoadingResource(dlist_release);
monitor::cache_cell_loading_count(cell_data_type_, storage_type_).Decrement(loading_cids_count);
monitor::cache_loading_bytes(cell_data_type_, StorageType::MEMORY)
.Decrement(resource_needed_for_loading.memory_bytes);
.Decrement(dlist_release.memory_bytes);
monitor::cache_loading_bytes(cell_data_type_, StorageType::DISK)
.Decrement(resource_needed_for_loading.file_bytes);
.Decrement(dlist_release.file_bytes);
} catch (...) {
auto exception = std::current_exception();
auto ew = folly::exception_wrapper(exception);
LOG_ERROR(
"[MCL] CacheSlot failed to release loading resource for cells with exception, something must "
"[MCL] CacheSlot failed to release loading resource for cells with exception, something "
"must "
"be wrong: "
"key={}, "
"loading_cids=[{}], error={}",
translator_->key(), fmt::join(loading_cids, ","), ew.what());
"loading_cids_count={}, error={}",
translator_->key(), loading_cids_count, ew.what());
}
});
tracker_pending_loading = {};

LOG_TRACE(
"[MCL] CacheSlot reserveLoadingResourceWithTimeout {} sec "
"result: {} time: {} sec, resource_needed: {}, key: {}",
"result: {} time: {} sec, loaded_resource: {}, loading_resource: {}, "
"actual_dlist_reserve: {}, key: {}",
timeout.count() / 1000.0, reservation_success ? "success" : "failed",
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)
.count() *
1.0 / 1000,
resource_needed_for_loading.ToString(), translator_->key());
loaded_resource.ToString(), loading_resource.ToString(), actual_dlist_reserve.ToString(),
translator_->key());

run_load_internal();
} catch (...) {
// Clean up tracker state if not yet taken over by defer_release
if (loading_overhead_tracker_ && tracker_pending_loading.AnyGTZero()) {
loading_overhead_tracker_->Release(cell_data_type_, tracker_pending_loading);
}
auto exception = std::current_exception();
auto ew = folly::exception_wrapper(exception);
monitor::cache_load_event_fail_total(cell_data_type_, storage_type_).Increment();
Expand Down Expand Up @@ -646,6 +707,7 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
const bool self_reserve_;
const bool storage_usage_tracking_enabled_;
std::chrono::milliseconds loading_timeout_{100000};
LoadingOverheadTracker* loading_overhead_tracker_{nullptr};
std::atomic<bool> warmup_called_{false};
std::atomic<bool> skip_pin_{false};
};
Expand Down
Loading
Loading