Skip to content

Commit f7aa74e

Browse files
[fix](filecache) reset_range dose not update shadow queue causing large cache size (#59314)
shadown queue is copying the actual LRU queue and provide lockless acess. but the copying loses updating size when actual LRU queue is reseting range (when load data, we first allocate 1MB block for the data and reset the size to the real size when finalizing). This commit does the following to fix this problem: 1. update the corresponding shadow queue element when resetting 2. calibrate size during initial loading into memory process Signed-off-by: zhengyu <[email protected]>
1 parent 120ea03 commit f7aa74e

File tree

9 files changed

+156
-17
lines changed

9 files changed

+156
-17
lines changed

be/src/io/cache/block_file_cache.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,17 @@ FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheC
858858
return nullptr; /// Empty files are not cached.
859859
}
860860

861+
VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
862+
<< " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
863+
<< " expiration_time=" << context.expiration_time
864+
<< " tablet_id=" << context.tablet_id;
865+
866+
if (size > 1024 * 1024 * 1024) {
867+
LOG(WARNING) << "File block size is too large for a block. size=" << size
868+
<< " hash=" << hash.to_string() << " offset=" << offset
869+
<< " stack:" << get_stack_trace();
870+
}
871+
861872
auto& offsets = _files[hash];
862873
auto itr = offsets.find(offset);
863874
if (itr != offsets.end()) {
@@ -1211,10 +1222,10 @@ void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size
12111222
if (cell->queue_iterator) {
12121223
auto& queue = get_queue(cell->file_block->cache_type());
12131224
DCHECK(queue.contains(hash, offset, cache_lock));
1214-
auto iter = queue.get(hash, offset, cache_lock);
1215-
iter->size = new_size;
1216-
queue.cache_size -= old_size;
1217-
queue.cache_size += new_size;
1225+
queue.resize(*cell->queue_iterator, new_size, cache_lock);
1226+
_lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1227+
cell->file_block->get_hash_value(),
1228+
cell->file_block->offset(), new_size);
12181229
}
12191230
_cur_cache_size -= old_size;
12201231
_cur_cache_size += new_size;
@@ -1523,6 +1534,13 @@ void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
15231534
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
15241535
queue.splice(queue.end(), queue, queue_it);
15251536
}
1537+
1538+
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1539+
std::lock_guard<std::mutex>& /* cache_lock */) {
1540+
cache_size -= queue_it->size;
1541+
queue_it->size = new_size;
1542+
cache_size += new_size;
1543+
}
15261544
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
15271545
std::lock_guard<std::mutex>& /* cache_lock */) const {
15281546
return map.find(std::make_pair(hash, offset)) != map.end();

be/src/io/cache/file_cache_common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ class LRUQueue {
233233

234234
void move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& cache_lock);
235235

236+
void resize(Iterator queue_it, size_t new_size, std::lock_guard<std::mutex>& cache_lock);
237+
236238
std::string to_string(std::lock_guard<std::mutex>& cache_lock) const;
237239

238240
bool contains(const UInt128Wrapper& hash, size_t offset,

be/src/io/cache/fs_file_cache_storage.cpp

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,31 @@ Status FSFileCacheStorage::parse_filename_suffix_to_cache_type(
583583
return Status::OK();
584584
}
585585

586+
bool FSFileCacheStorage::handle_already_loaded_block(
587+
BlockFileCache* mgr, const UInt128Wrapper& hash, size_t offset, size_t new_size,
588+
int64_t tablet_id, std::lock_guard<std::mutex>& cache_lock) const {
589+
auto file_it = mgr->_files.find(hash);
590+
if (file_it == mgr->_files.end()) {
591+
return false;
592+
}
593+
594+
auto cell_it = file_it->second.find(offset);
595+
if (cell_it == file_it->second.end()) {
596+
return false;
597+
}
598+
599+
auto block = cell_it->second.file_block;
600+
if (tablet_id != 0 && block->tablet_id() == 0) {
601+
block->set_tablet_id(tablet_id);
602+
}
603+
604+
size_t old_size = block->range().size();
605+
if (old_size != new_size) {
606+
mgr->reset_range(hash, offset, old_size, new_size, cache_lock);
607+
}
608+
return true;
609+
}
610+
586611
void FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mgr) const {
587612
int scan_length = 10000;
588613
std::vector<BatchLoadArgs> batch_load_buffer;
@@ -592,8 +617,8 @@ void FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mg
592617

593618
auto f = [&](const BatchLoadArgs& args) {
594619
// in async load mode, a cell may be added twice.
595-
if (_mgr->_files.contains(args.hash) && _mgr->_files[args.hash].contains(args.offset)) {
596-
// TODO(zhengyu): update type&expiration if need
620+
if (handle_already_loaded_block(_mgr, args.hash, args.offset, args.size,
621+
args.ctx.tablet_id, cache_lock)) {
597622
return;
598623
}
599624
// if the file is tmp, it means it is the old file and it should be removed
@@ -773,11 +798,8 @@ void FSFileCacheStorage::load_cache_info_into_memory_from_db(BlockFileCache* _mg
773798

774799
auto f = [&](const BatchLoadArgs& args) {
775800
// in async load mode, a cell may be added twice.
776-
if (_mgr->_files.contains(args.hash) && _mgr->_files[args.hash].contains(args.offset)) {
777-
auto block = _mgr->_files[args.hash][args.offset].file_block;
778-
if (block->tablet_id() == 0) {
779-
block->set_tablet_id(args.ctx.tablet_id);
780-
}
801+
if (handle_already_loaded_block(_mgr, args.hash, args.offset, args.size,
802+
args.ctx.tablet_id, cache_lock)) {
781803
return;
782804
}
783805
_mgr->add_cell(args.hash, args.ctx, args.offset, args.size,
@@ -920,7 +942,10 @@ void FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, cons
920942
context_original.cache_type = static_cast<FileCacheType>(block_meta->type);
921943
context_original.tablet_id = key.meta.tablet_id;
922944

923-
if (!mgr->_files.contains(key.hash) || !mgr->_files[key.hash].contains(key.offset)) {
945+
if (handle_already_loaded_block(mgr, key.hash, key.offset, block_meta->size, key.meta.tablet_id,
946+
cache_lock)) {
947+
return;
948+
} else {
924949
mgr->add_cell(key.hash, context_original, key.offset, block_meta->size,
925950
FileBlock::State::DOWNLOADED, cache_lock);
926951
}

be/src/io/cache/fs_file_cache_storage.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ class FSFileCacheStorage : public FileCacheStorage {
111111

112112
void load_cache_info_into_memory(BlockFileCache* _mgr) const;
113113

114+
bool handle_already_loaded_block(BlockFileCache* mgr, const UInt128Wrapper& hash, size_t offset,
115+
size_t new_size, int64_t tablet_id,
116+
std::lock_guard<std::mutex>& cache_lock) const;
117+
114118
private:
115119
// Helper function to count files in cache directory using statfs
116120
size_t estimate_file_count_from_statfs() const;

be/src/io/cache/lru_queue_recorder.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) {
6262
}
6363
break;
6464
}
65+
case CacheLRULogType::RESIZE: {
66+
auto it = shadow_queue.get(log->hash, log->offset, lru_log_lock);
67+
if (it != std::list<LRUQueue::FileKeyAndOffset>::iterator()) {
68+
shadow_queue.resize(it, log->size, lru_log_lock);
69+
} else {
70+
LOG(WARNING) << "RESIZE failed, doesn't exist in shadow queue";
71+
}
72+
break;
73+
}
6574
default:
6675
LOG(WARNING) << "Unknown CacheLRULogType: " << static_cast<int>(log->type);
6776
break;

be/src/io/cache/lru_queue_recorder.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ enum class CacheLRULogType {
3131
ADD = 0, // all of the integer types
3232
REMOVE = 1,
3333
MOVETOBACK = 2,
34-
INVALID = 3,
34+
RESIZE = 3,
35+
INVALID = 4,
3536
};
3637

3738
struct CacheLRULog {

be/src/util/runtime_profile.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ class RuntimeProfile {
395395
int64_t level = 2, int64_t condition = 0, int64_t value = 0)
396396
: Counter(type, value, level),
397397
_condition(condition),
398-
_value(value),
398+
_stored_value(value),
399399
_condition_func(condition_func) {}
400400

401401
Counter* clone() const override {
@@ -405,21 +405,21 @@ class RuntimeProfile {
405405

406406
int64_t value() const override {
407407
std::lock_guard<std::mutex> l(_mutex);
408-
return _value;
408+
return _stored_value;
409409
}
410410

411411
void conditional_update(int64_t c, int64_t v) {
412412
std::lock_guard<std::mutex> l(_mutex);
413413
if (_condition_func(_condition, c)) {
414-
_value = v;
414+
_stored_value = v;
415415
_condition = c;
416416
}
417417
}
418418

419419
private:
420420
mutable std::mutex _mutex;
421421
int64_t _condition;
422-
int64_t _value;
422+
int64_t _stored_value;
423423
ConditionCounterFunction _condition_func;
424424
};
425425

be/test/io/cache/block_file_cache_test_meta_store.cpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,75 @@ TEST_F(BlockFileCacheTest, clear_retains_meta_directory_and_clears_meta_entries)
511511
}
512512
}
513513

514+
TEST_F(BlockFileCacheTest, handle_already_loaded_block_updates_size_and_tablet) {
515+
config::enable_evict_file_cache_in_advance = false;
516+
if (fs::exists(cache_base_path)) {
517+
fs::remove_all(cache_base_path);
518+
}
519+
fs::create_directories(cache_base_path);
520+
521+
io::FileCacheSettings settings;
522+
settings.ttl_queue_size = 5000000;
523+
settings.ttl_queue_elements = 50000;
524+
settings.query_queue_size = 5000000;
525+
settings.query_queue_elements = 50000;
526+
settings.index_queue_size = 5000000;
527+
settings.index_queue_elements = 50000;
528+
settings.disposable_queue_size = 5000000;
529+
settings.disposable_queue_elements = 50000;
530+
settings.capacity = 20000000;
531+
settings.max_file_block_size = 100000;
532+
settings.max_query_cache_size = 30;
533+
534+
io::BlockFileCache cache(cache_base_path, settings);
535+
ASSERT_TRUE(cache.initialize());
536+
for (int i = 0; i < 100; ++i) {
537+
if (cache.get_async_open_success()) {
538+
break;
539+
}
540+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
541+
}
542+
ASSERT_TRUE(cache.get_async_open_success());
543+
544+
io::CacheContext context;
545+
ReadStatistics rstats;
546+
context.stats = &rstats;
547+
context.cache_type = io::FileCacheType::NORMAL;
548+
context.query_id.hi = 11;
549+
context.query_id.lo = 12;
550+
context.tablet_id = 0;
551+
auto key = io::BlockFileCache::hash("sync_cached_block_meta_key");
552+
553+
constexpr size_t kOriginalSize = 100000;
554+
auto holder = cache.get_or_set(key, 0, kOriginalSize, context);
555+
auto blocks = fromHolder(holder);
556+
ASSERT_EQ(blocks.size(), 1);
557+
ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
558+
download(blocks[0], kOriginalSize);
559+
blocks.clear();
560+
561+
auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(cache._storage.get());
562+
ASSERT_NE(fs_storage, nullptr) << "Expected FSFileCacheStorage but got different storage type";
563+
564+
constexpr size_t kNewSize = 2 * kOriginalSize;
565+
constexpr int64_t kTabletId = 4242;
566+
bool handled = false;
567+
{
568+
SCOPED_CACHE_LOCK(cache._mutex, (&cache));
569+
handled = fs_storage->handle_already_loaded_block(&cache, key, 0, kNewSize, kTabletId,
570+
cache_lock);
571+
}
572+
573+
ASSERT_TRUE(handled);
574+
auto& cell = cache._files[key][0];
575+
EXPECT_EQ(cell.file_block->tablet_id(), kTabletId);
576+
EXPECT_EQ(cache._cur_cache_size, kNewSize);
577+
EXPECT_EQ(cache._normal_queue.get_capacity_unsafe(), kNewSize);
578+
579+
if (fs::exists(cache_base_path)) {
580+
fs::remove_all(cache_base_path);
581+
}
582+
}
514583
TEST_F(BlockFileCacheTest, estimate_file_count_skips_removed_directory) {
515584
std::string test_dir = cache_base_path + "/estimate_file_count_removed_dir";
516585
if (fs::exists(test_dir)) {

be/test/io/cache/lru_queue_test.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,14 @@ TEST_F(LRUQueueTest, SameElementsDifferentOrder) {
115115

116116
EXPECT_EQ(queue1->levenshtein_distance_from(*queue2, lock), 2);
117117
}
118+
119+
TEST_F(LRUQueueTest, ResizeUpdatesCacheSize) {
120+
std::mutex mutex;
121+
std::lock_guard lock(mutex);
122+
123+
auto iter = queue1->add(UInt128Wrapper(123), 0, 1024, lock);
124+
EXPECT_EQ(queue1->get_capacity(lock), 1024);
125+
126+
queue1->resize(iter, 2048, lock);
127+
EXPECT_EQ(queue1->get_capacity(lock), 2048);
128+
}

0 commit comments

Comments
 (0)