Skip to content

Commit a2a2108

Browse files
committed
enhance: add per-type loading overhead upper bound for CachingLayer
Add LoadingOverheadTracker to cap the total DList loading resource reservation for temporary loading overhead (e.g. preprocessing buffers) per CellDataType. The tracker uses a delta-based model: - Reserve() returns the incremental amount to reserve from DList - Release() returns the incremental amount to release back to DList - Total overhead reserved = min(sum_of_overhead, upper_bound) This prevents over-reservation when many cells load concurrently, since actual peak resource usage is bounded by pool_size * cell_size. Changes: - New LoadingOverheadTracker class with per-type state tracking - Translator::estimated_byte_size_of_cell now returns {loaded, loading} pair to separate final cache usage from temporary loading overhead - CacheSlot::RunLoad integrates with tracker for reserve/release - Manager passes tracker to CacheSlots and supports UB registration - ~300 lines of unit tests for the tracker Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
1 parent dbe249a commit a2a2108

File tree

7 files changed

+667
-43
lines changed

7 files changed

+667
-43
lines changed

include/cachinglayer/CacheSlot.h

Lines changed: 91 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <utility>
3030
#include <vector>
3131

32+
#include "cachinglayer/LoadingOverheadTracker.h"
3233
#include "cachinglayer/Metrics.h"
3334
#include "cachinglayer/Translator.h"
3435
#include "cachinglayer/Utils.h"
@@ -54,7 +55,8 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
5455
"representing the memory consumption of the cell");
5556

5657
CacheSlot(std::unique_ptr<Translator<CellT>> translator, internal::DList* dlist, bool evictable, bool self_reserve,
57-
bool storage_usage_tracking_enabled, std::chrono::milliseconds loading_timeout)
58+
bool storage_usage_tracking_enabled, std::chrono::milliseconds loading_timeout,
59+
LoadingOverheadTracker* loading_overhead_tracker = nullptr)
5860
: translator_(std::move(translator)),
5961
cell_id_mapping_mode_(translator_->meta()->cell_id_mapping_mode),
6062
cell_data_type_(translator_->meta()->cell_data_type),
@@ -63,7 +65,8 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
6365
evictable_(evictable),
6466
self_reserve_(self_reserve),
6567
storage_usage_tracking_enabled_(storage_usage_tracking_enabled),
66-
loading_timeout_(loading_timeout) {
68+
loading_timeout_(loading_timeout),
69+
loading_overhead_tracker_(loading_overhead_tracker) {
6770
cells_.reserve(translator_->num_cells());
6871
for (cid_t i = 0; i < static_cast<cid_t>(translator_->num_cells()); ++i) {
6972
cells_.push_back(std::make_unique<CacheCell>(this, i));
@@ -407,9 +410,18 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
407410

408411
void
409412
RunLoad(OpContext* ctx, std::unordered_set<cid_t>&& cids, std::chrono::milliseconds timeout) {
413+
// loaded_resource: the estimated final resource usage (from .first), reserved unconditionally.
414+
// loading_resource: the estimated temporary overhead during loading (from .second),
415+
// capped at per-type UB via LoadingOverheadTracker. The tracker returns the incremental
416+
// delta to reserve from DList, so total loading in DList = min(sum, UB) per type.
417+
ResourceUsage essential_loaded_resource{};
410418
ResourceUsage essential_loading_resource{};
419+
ResourceUsage bonus_loaded_resource{};
411420
ResourceUsage bonus_loading_resource{};
412421
std::vector<cid_t> loading_cids;
422+
// Tracks the loading overhead currently reserved in the tracker but not yet
423+
// guarded by defer_release. Used to clean up tracker state on exception.
424+
ResourceUsage tracker_pending_loading{};
413425
try {
414426
auto start = std::chrono::steady_clock::now();
415427
bool reservation_success = false;
@@ -439,16 +451,31 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
439451
auto bonus_cids = translator_->bonus_cells_to_be_loaded(loading_cids);
440452

441453
for (auto& cid : loading_cids) {
442-
essential_loading_resource += translator_->estimated_byte_size_of_cell(cid).second;
454+
auto [loaded, loading] = translator_->estimated_byte_size_of_cell(cid);
455+
essential_loaded_resource += loaded;
456+
essential_loading_resource += loading;
443457
}
444458

445459
for (auto& cid : bonus_cids) {
446-
bonus_loading_resource += translator_->estimated_byte_size_of_cell(cid).second;
460+
auto [loaded, loading] = translator_->estimated_byte_size_of_cell(cid);
461+
bonus_loaded_resource += loaded;
462+
bonus_loading_resource += loading;
447463
}
448464

449-
auto resource_needed_for_loading = essential_loading_resource + bonus_loading_resource;
465+
// loaded_resource is reserved unconditionally from DList (no capping).
466+
// loading_resource goes through the tracker: returns delta = change in min(sum, UB).
467+
auto loaded_resource = essential_loaded_resource + bonus_loaded_resource;
468+
auto loading_resource = essential_loading_resource + bonus_loading_resource;
469+
470+
auto loading_delta = loading_resource;
471+
if (loading_overhead_tracker_) {
472+
loading_delta = loading_overhead_tracker_->Reserve(cell_data_type_, loading_resource);
473+
tracker_pending_loading = loading_resource;
474+
}
475+
auto actual_dlist_reserve = loaded_resource + loading_delta;
476+
450477
reservation_success =
451-
SemiInlineGet(dlist_->ReserveLoadingResourceWithTimeout(resource_needed_for_loading, timeout, ctx));
478+
SemiInlineGet(dlist_->ReserveLoadingResourceWithTimeout(actual_dlist_reserve, timeout, ctx));
452479

453480
if (!bonus_cids.empty()) {
454481
// if the reservation failed, try to reserve only the essential loading resource
@@ -457,66 +484,100 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
457484
"[MCL] CacheSlot reserve loading resource with bonus cells failed, try to reserve only "
458485
"essential "
459486
"loading resource");
460-
resource_needed_for_loading = essential_loading_resource;
461-
reservation_success = SemiInlineGet(
462-
dlist_->ReserveLoadingResourceWithTimeout(resource_needed_for_loading, timeout, ctx));
487+
// Undo the tracker state for the full reservation
488+
if (loading_overhead_tracker_) {
489+
loading_overhead_tracker_->Release(cell_data_type_, loading_resource);
490+
tracker_pending_loading = {};
491+
}
492+
loaded_resource = essential_loaded_resource;
493+
loading_resource = essential_loading_resource;
494+
if (loading_overhead_tracker_) {
495+
loading_delta = loading_overhead_tracker_->Reserve(cell_data_type_, loading_resource);
496+
tracker_pending_loading = loading_resource;
497+
} else {
498+
loading_delta = loading_resource;
499+
}
500+
actual_dlist_reserve = loaded_resource + loading_delta;
501+
reservation_success =
502+
SemiInlineGet(dlist_->ReserveLoadingResourceWithTimeout(actual_dlist_reserve, timeout, ctx));
463503
} else {
464504
// if the reservation succeeded, we can load the bonus cells
465505
loading_cids.insert(loading_cids.end(), bonus_cids.begin(), bonus_cids.end());
466506
}
467507
}
468508

469509
if (!reservation_success) {
510+
// Undo tracker state on failure
511+
if (loading_overhead_tracker_) {
512+
loading_overhead_tracker_->Release(cell_data_type_, loading_resource);
513+
tracker_pending_loading = {};
514+
}
470515
LOG_ERROR(
471516
"[MCL] CacheSlot failed to reserve resource for "
472-
"cells: key={}, cell_ids=[{}], total "
473-
"resource_needed_for_loading={}",
474-
translator_->key(), fmt::join(loading_cids, ","), resource_needed_for_loading.ToString());
517+
"cells: key={}, cell_ids=[{}], "
518+
"loaded_resource={}, loading_resource={}, actual_dlist_reserve={}",
519+
translator_->key(), fmt::join(loading_cids, ","), loaded_resource.ToString(),
520+
loading_resource.ToString(), actual_dlist_reserve.ToString());
475521
ThrowInfo(ErrorCode::InsufficientResource,
476522
"[MCL] CacheSlot failed to reserve resource for "
477-
"cells: key={}, cell_ids=[{}], total "
478-
"resource_needed_for_loading={}",
479-
translator_->key(), fmt::join(loading_cids, ","), resource_needed_for_loading.ToString());
523+
"cells: key={}, cell_ids=[{}], "
524+
"loaded_resource={}, loading_resource={}, actual_dlist_reserve={}",
525+
translator_->key(), fmt::join(loading_cids, ","), loaded_resource.ToString(),
526+
loading_resource.ToString(), actual_dlist_reserve.ToString());
480527
}
481528

482529
monitor::cache_loading_bytes(cell_data_type_, StorageType::MEMORY)
483-
.Increment(resource_needed_for_loading.memory_bytes);
484-
monitor::cache_loading_bytes(cell_data_type_, StorageType::DISK)
485-
.Increment(resource_needed_for_loading.file_bytes);
530+
.Increment(actual_dlist_reserve.memory_bytes);
531+
monitor::cache_loading_bytes(cell_data_type_, StorageType::DISK).Increment(actual_dlist_reserve.file_bytes);
486532
monitor::cache_cell_loading_count(cell_data_type_, storage_type_).Increment(loading_cids.size());
487533

488-
// defer release resource_needed_for_loading
489-
auto defer_release = folly::makeGuard([this, &resource_needed_for_loading, &loading_cids]() {
534+
// defer release: loaded_resource + loading delta from tracker.
535+
// Once created, defer_release owns the tracker cleanup — clear tracker_pending_loading.
536+
auto loading_cids_count = loading_cids.size();
537+
auto defer_release = folly::makeGuard([this, loaded_resource, loading_resource, loading_cids_count]() {
490538
try {
491-
dlist_->ReleaseLoadingResource(resource_needed_for_loading);
492-
monitor::cache_cell_loading_count(cell_data_type_, storage_type_).Decrement(loading_cids.size());
539+
auto release_delta = loading_resource;
540+
if (loading_overhead_tracker_) {
541+
release_delta = loading_overhead_tracker_->Release(cell_data_type_, loading_resource);
542+
}
543+
auto dlist_release = loaded_resource + release_delta;
544+
dlist_->ReleaseLoadingResource(dlist_release);
545+
monitor::cache_cell_loading_count(cell_data_type_, storage_type_).Decrement(loading_cids_count);
493546
monitor::cache_loading_bytes(cell_data_type_, StorageType::MEMORY)
494-
.Decrement(resource_needed_for_loading.memory_bytes);
547+
.Decrement(dlist_release.memory_bytes);
495548
monitor::cache_loading_bytes(cell_data_type_, StorageType::DISK)
496-
.Decrement(resource_needed_for_loading.file_bytes);
549+
.Decrement(dlist_release.file_bytes);
497550
} catch (...) {
498551
auto exception = std::current_exception();
499552
auto ew = folly::exception_wrapper(exception);
500553
LOG_ERROR(
501-
"[MCL] CacheSlot failed to release loading resource for cells with exception, something must "
554+
"[MCL] CacheSlot failed to release loading resource for cells with exception, something "
555+
"must "
502556
"be wrong: "
503557
"key={}, "
504-
"loading_cids=[{}], error={}",
505-
translator_->key(), fmt::join(loading_cids, ","), ew.what());
558+
"loading_cids_count={}, error={}",
559+
translator_->key(), loading_cids_count, ew.what());
506560
}
507561
});
562+
tracker_pending_loading = {};
508563

509564
LOG_TRACE(
510565
"[MCL] CacheSlot reserveLoadingResourceWithTimeout {} sec "
511-
"result: {} time: {} sec, resource_needed: {}, key: {}",
566+
"result: {} time: {} sec, loaded_resource: {}, loading_resource: {}, "
567+
"actual_dlist_reserve: {}, key: {}",
512568
timeout.count() / 1000.0, reservation_success ? "success" : "failed",
513569
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)
514570
.count() *
515571
1.0 / 1000,
516-
resource_needed_for_loading.ToString(), translator_->key());
572+
loaded_resource.ToString(), loading_resource.ToString(), actual_dlist_reserve.ToString(),
573+
translator_->key());
517574

518575
run_load_internal();
519576
} catch (...) {
577+
// Clean up tracker state if not yet taken over by defer_release
578+
if (loading_overhead_tracker_ && tracker_pending_loading.AnyGTZero()) {
579+
loading_overhead_tracker_->Release(cell_data_type_, tracker_pending_loading);
580+
}
520581
auto exception = std::current_exception();
521582
auto ew = folly::exception_wrapper(exception);
522583
monitor::cache_load_event_fail_total(cell_data_type_, storage_type_).Increment();
@@ -646,6 +707,7 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
646707
const bool self_reserve_;
647708
const bool storage_usage_tracking_enabled_;
648709
std::chrono::milliseconds loading_timeout_{100000};
710+
LoadingOverheadTracker* loading_overhead_tracker_{nullptr};
649711
std::atomic<bool> warmup_called_{false};
650712
std::atomic<bool> skip_pin_{false};
651713
};

0 commit comments

Comments
 (0)