diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 4ee923e080e7be..135e87baa7b0a5 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -858,6 +858,17 @@ FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheC return nullptr; /// Empty files are not cached. } + VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string() + << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type) + << " expiration_time=" << context.expiration_time + << " tablet_id=" << context.tablet_id; + + if (size > 1024 * 1024 * 1024) { + LOG(WARNING) << "File block size is too large for a block. size=" << size + << " hash=" << hash.to_string() << " offset=" << offset + << " stack:" << get_stack_trace(); + } + auto& offsets = _files[hash]; auto itr = offsets.find(offset); if (itr != offsets.end()) { @@ -1211,10 +1222,10 @@ void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size if (cell->queue_iterator) { auto& queue = get_queue(cell->file_block->cache_type()); DCHECK(queue.contains(hash, offset, cache_lock)); - auto iter = queue.get(hash, offset, cache_lock); - iter->size = new_size; - queue.cache_size -= old_size; - queue.cache_size += new_size; + queue.resize(*cell->queue_iterator, new_size, cache_lock); + _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE, + cell->file_block->get_hash_value(), + cell->file_block->offset(), new_size); } _cur_cache_size -= old_size; _cur_cache_size += new_size; @@ -1523,6 +1534,13 @@ void LRUQueue::remove_all(std::lock_guard& /* cache_lock */) { void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard& /* cache_lock */) { queue.splice(queue.end(), queue, queue_it); } + +void LRUQueue::resize(Iterator queue_it, size_t new_size, + std::lock_guard& /* cache_lock */) { + cache_size -= queue_it->size; + queue_it->size = new_size; + cache_size += new_size; +} bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset, std::lock_guard& /* cache_lock */) const { return map.find(std::make_pair(hash, offset)) != map.end(); diff --git a/be/src/io/cache/file_cache_common.h b/be/src/io/cache/file_cache_common.h index 69427ec34928d2..96cde89f6c3d9d 100644 --- a/be/src/io/cache/file_cache_common.h +++ b/be/src/io/cache/file_cache_common.h @@ -233,6 +233,8 @@ class LRUQueue { void move_to_end(Iterator queue_it, std::lock_guard& cache_lock); + void resize(Iterator queue_it, size_t new_size, std::lock_guard& cache_lock); + std::string to_string(std::lock_guard& cache_lock) const; bool contains(const UInt128Wrapper& hash, size_t offset, diff --git a/be/src/io/cache/fs_file_cache_storage.cpp b/be/src/io/cache/fs_file_cache_storage.cpp index 0860265e181bb3..53e4f0c4dd9fb0 100644 --- a/be/src/io/cache/fs_file_cache_storage.cpp +++ b/be/src/io/cache/fs_file_cache_storage.cpp @@ -583,6 +583,31 @@ Status FSFileCacheStorage::parse_filename_suffix_to_cache_type( return Status::OK(); } +bool FSFileCacheStorage::handle_already_loaded_block( + BlockFileCache* mgr, const UInt128Wrapper& hash, size_t offset, size_t new_size, + int64_t tablet_id, std::lock_guard& cache_lock) const { + auto file_it = mgr->_files.find(hash); + if (file_it == mgr->_files.end()) { + return false; + } + + auto cell_it = file_it->second.find(offset); + if (cell_it == file_it->second.end()) { + return false; + } + + auto block = cell_it->second.file_block; + if (tablet_id != 0 && block->tablet_id() == 0) { + block->set_tablet_id(tablet_id); + } + + size_t old_size = block->range().size(); + if (old_size != new_size) { + mgr->reset_range(hash, offset, old_size, new_size, cache_lock); + } + return true; +} + void FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mgr) const { int scan_length = 10000; std::vector batch_load_buffer; @@ -592,8 +617,8 @@ void FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mg auto f = [&](const BatchLoadArgs& args) { // in async load mode, a cell may be added twice. - if (_mgr->_files.contains(args.hash) && _mgr->_files[args.hash].contains(args.offset)) { - // TODO(zhengyu): update type&expiration if need + if (handle_already_loaded_block(_mgr, args.hash, args.offset, args.size, + args.ctx.tablet_id, cache_lock)) { return; } // if the file is tmp, it means it is the old file and it should be removed @@ -773,11 +798,8 @@ void FSFileCacheStorage::load_cache_info_into_memory_from_db(BlockFileCache* _mg auto f = [&](const BatchLoadArgs& args) { // in async load mode, a cell may be added twice. - if (_mgr->_files.contains(args.hash) && _mgr->_files[args.hash].contains(args.offset)) { - auto block = _mgr->_files[args.hash][args.offset].file_block; - if (block->tablet_id() == 0) { - block->set_tablet_id(args.ctx.tablet_id); - } + if (handle_already_loaded_block(_mgr, args.hash, args.offset, args.size, + args.ctx.tablet_id, cache_lock)) { return; } _mgr->add_cell(args.hash, args.ctx, args.offset, args.size, @@ -920,7 +942,10 @@ void FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, cons context_original.cache_type = static_cast(block_meta->type); context_original.tablet_id = key.meta.tablet_id; - if (!mgr->_files.contains(key.hash) || !mgr->_files[key.hash].contains(key.offset)) { + if (handle_already_loaded_block(mgr, key.hash, key.offset, block_meta->size, key.meta.tablet_id, + cache_lock)) { + return; + } else { mgr->add_cell(key.hash, context_original, key.offset, block_meta->size, FileBlock::State::DOWNLOADED, cache_lock); } diff --git a/be/src/io/cache/fs_file_cache_storage.h b/be/src/io/cache/fs_file_cache_storage.h index ea5695438c6090..d486552d2b62d7 100644 --- a/be/src/io/cache/fs_file_cache_storage.h +++ b/be/src/io/cache/fs_file_cache_storage.h @@ -111,6 +111,10 @@ class FSFileCacheStorage : public FileCacheStorage { void load_cache_info_into_memory(BlockFileCache* _mgr) const; + bool handle_already_loaded_block(BlockFileCache* mgr, const UInt128Wrapper& hash, size_t offset, + size_t new_size, int64_t tablet_id, + std::lock_guard& cache_lock) const; + private: // Helper function to count files in cache directory using statfs size_t estimate_file_count_from_statfs() const; diff --git a/be/src/io/cache/lru_queue_recorder.cpp b/be/src/io/cache/lru_queue_recorder.cpp index 8308a2a73ad6e3..9907e58cb2a607 100644 --- a/be/src/io/cache/lru_queue_recorder.cpp +++ b/be/src/io/cache/lru_queue_recorder.cpp @@ -62,6 +62,15 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) { } break; } + case CacheLRULogType::RESIZE: { + auto it = shadow_queue.get(log->hash, log->offset, lru_log_lock); + if (it != std::list::iterator()) { + shadow_queue.resize(it, log->size, lru_log_lock); + } else { + LOG(WARNING) << "RESIZE failed, doesn't exist in shadow queue"; + } + break; + } default: LOG(WARNING) << "Unknown CacheLRULogType: " << static_cast(log->type); break; diff --git a/be/src/io/cache/lru_queue_recorder.h b/be/src/io/cache/lru_queue_recorder.h index 1f6d69493cf4a8..5bd68b70d555f9 100644 --- a/be/src/io/cache/lru_queue_recorder.h +++ b/be/src/io/cache/lru_queue_recorder.h @@ -31,7 +31,8 @@ enum class CacheLRULogType { ADD = 0, // all of the integer types REMOVE = 1, MOVETOBACK = 2, - INVALID = 3, + RESIZE = 3, + INVALID = 4, }; struct CacheLRULog { diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index a9ccb0910b66c6..79c0c1538c5659 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -395,7 +395,7 @@ class RuntimeProfile { int64_t level = 2, int64_t condition = 0, int64_t value = 0) : Counter(type, value, level), _condition(condition), - _value(value), + _stored_value(value), _condition_func(condition_func) {} Counter* clone() const override { @@ -405,13 +405,13 @@ class RuntimeProfile { int64_t value() const override { std::lock_guard l(_mutex); - return _value; + return _stored_value; } void conditional_update(int64_t c, int64_t v) { std::lock_guard l(_mutex); if (_condition_func(_condition, c)) { - _value = v; + _stored_value = v; _condition = c; } } @@ -419,7 +419,7 @@ class RuntimeProfile { private: mutable std::mutex _mutex; int64_t _condition; - int64_t _value; + int64_t _stored_value; ConditionCounterFunction _condition_func; }; diff --git a/be/test/io/cache/block_file_cache_test_meta_store.cpp b/be/test/io/cache/block_file_cache_test_meta_store.cpp index 8234d03b5279a5..585c359dd73961 100644 --- a/be/test/io/cache/block_file_cache_test_meta_store.cpp +++ b/be/test/io/cache/block_file_cache_test_meta_store.cpp @@ -511,6 +511,75 @@ TEST_F(BlockFileCacheTest, clear_retains_meta_directory_and_clears_meta_entries) } } +TEST_F(BlockFileCacheTest, handle_already_loaded_block_updates_size_and_tablet) { + config::enable_evict_file_cache_in_advance = false; + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + + io::FileCacheSettings settings; + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 5000000; + settings.query_queue_elements = 50000; + settings.index_queue_size = 5000000; + settings.index_queue_elements = 50000; + settings.disposable_queue_size = 5000000; + settings.disposable_queue_elements = 50000; + settings.capacity = 20000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + for (int i = 0; i < 100; ++i) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + ASSERT_TRUE(cache.get_async_open_success()); + + io::CacheContext context; + ReadStatistics rstats; + context.stats = &rstats; + context.cache_type = io::FileCacheType::NORMAL; + context.query_id.hi = 11; + context.query_id.lo = 12; + context.tablet_id = 0; + auto key = io::BlockFileCache::hash("sync_cached_block_meta_key"); + + constexpr size_t kOriginalSize = 100000; + auto holder = cache.get_or_set(key, 0, kOriginalSize, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0], kOriginalSize); + blocks.clear(); + + auto* fs_storage = dynamic_cast(cache._storage.get()); + ASSERT_NE(fs_storage, nullptr) << "Expected FSFileCacheStorage but got different storage type"; + + constexpr size_t kNewSize = 2 * kOriginalSize; + constexpr int64_t kTabletId = 4242; + bool handled = false; + { + SCOPED_CACHE_LOCK(cache._mutex, (&cache)); + handled = fs_storage->handle_already_loaded_block(&cache, key, 0, kNewSize, kTabletId, + cache_lock); + } + + ASSERT_TRUE(handled); + auto& cell = cache._files[key][0]; + EXPECT_EQ(cell.file_block->tablet_id(), kTabletId); + EXPECT_EQ(cache._cur_cache_size, kNewSize); + EXPECT_EQ(cache._normal_queue.get_capacity_unsafe(), kNewSize); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} TEST_F(BlockFileCacheTest, estimate_file_count_skips_removed_directory) { std::string test_dir = cache_base_path + "/estimate_file_count_removed_dir"; if (fs::exists(test_dir)) { diff --git a/be/test/io/cache/lru_queue_test.cpp b/be/test/io/cache/lru_queue_test.cpp index 4a01fb27e3dcfe..2a9cdc3a6bc672 100644 --- a/be/test/io/cache/lru_queue_test.cpp +++ b/be/test/io/cache/lru_queue_test.cpp @@ -115,3 +115,14 @@ TEST_F(LRUQueueTest, SameElementsDifferentOrder) { EXPECT_EQ(queue1->levenshtein_distance_from(*queue2, lock), 2); } + +TEST_F(LRUQueueTest, ResizeUpdatesCacheSize) { + std::mutex mutex; + std::lock_guard lock(mutex); + + auto iter = queue1->add(UInt128Wrapper(123), 0, 1024, lock); + EXPECT_EQ(queue1->get_capacity(lock), 1024); + + queue1->resize(iter, 2048, lock); + EXPECT_EQ(queue1->get_capacity(lock), 2048); +}