Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,17 @@ FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheC
return nullptr; /// Empty files are not cached.
}

VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
<< " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
<< " expiration_time=" << context.expiration_time
<< " tablet_id=" << context.tablet_id;

if (size > 1024 * 1024 * 1024) {
LOG(WARNING) << "File block size is too large for a block. size=" << size
<< " hash=" << hash.to_string() << " offset=" << offset
<< " stack:" << get_stack_trace();
}

auto& offsets = _files[hash];
auto itr = offsets.find(offset);
if (itr != offsets.end()) {
Expand Down Expand Up @@ -1211,10 +1222,10 @@ void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size
if (cell->queue_iterator) {
auto& queue = get_queue(cell->file_block->cache_type());
DCHECK(queue.contains(hash, offset, cache_lock));
auto iter = queue.get(hash, offset, cache_lock);
iter->size = new_size;
queue.cache_size -= old_size;
queue.cache_size += new_size;
queue.resize(*cell->queue_iterator, new_size, cache_lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is easily to map queue.resize to std::queue.resize.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does that mean? need to change to another name?

_lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
cell->file_block->get_hash_value(),
cell->file_block->offset(), new_size);
}
_cur_cache_size -= old_size;
_cur_cache_size += new_size;
Expand Down Expand Up @@ -1523,6 +1534,13 @@ void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
queue.splice(queue.end(), queue, queue_it);
}

void LRUQueue::resize(Iterator queue_it, size_t new_size,
std::lock_guard<std::mutex>& /* cache_lock */) {
cache_size -= queue_it->size;
queue_it->size = new_size;
cache_size += new_size;
}
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
std::lock_guard<std::mutex>& /* cache_lock */) const {
return map.find(std::make_pair(hash, offset)) != map.end();
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/file_cache_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ class LRUQueue {

void move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& cache_lock);

void resize(Iterator queue_it, size_t new_size, std::lock_guard<std::mutex>& cache_lock);

std::string to_string(std::lock_guard<std::mutex>& cache_lock) const;

bool contains(const UInt128Wrapper& hash, size_t offset,
Expand Down
41 changes: 33 additions & 8 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,31 @@ Status FSFileCacheStorage::parse_filename_suffix_to_cache_type(
return Status::OK();
}

bool FSFileCacheStorage::handle_already_loaded_block(
BlockFileCache* mgr, const UInt128Wrapper& hash, size_t offset, size_t new_size,
int64_t tablet_id, std::lock_guard<std::mutex>& cache_lock) const {
auto file_it = mgr->_files.find(hash);
if (file_it == mgr->_files.end()) {
return false;
}

auto cell_it = file_it->second.find(offset);
if (cell_it == file_it->second.end()) {
return false;
}

auto block = cell_it->second.file_block;
if (tablet_id != 0 && block->tablet_id() == 0) {
block->set_tablet_id(tablet_id);
}

size_t old_size = block->range().size();
if (old_size != new_size) {
mgr->reset_range(hash, offset, old_size, new_size, cache_lock);
}
return true;
}

void FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mgr) const {
int scan_length = 10000;
std::vector<BatchLoadArgs> batch_load_buffer;
Expand All @@ -592,8 +617,8 @@ void FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mg

auto f = [&](const BatchLoadArgs& args) {
// in async load mode, a cell may be added twice.
if (_mgr->_files.contains(args.hash) && _mgr->_files[args.hash].contains(args.offset)) {
// TODO(zhengyu): update type&expiration if need
if (handle_already_loaded_block(_mgr, args.hash, args.offset, args.size,
args.ctx.tablet_id, cache_lock)) {
return;
}
// if the file is tmp, it means it is the old file and it should be removed
Expand Down Expand Up @@ -773,11 +798,8 @@ void FSFileCacheStorage::load_cache_info_into_memory_from_db(BlockFileCache* _mg

auto f = [&](const BatchLoadArgs& args) {
// in async load mode, a cell may be added twice.
if (_mgr->_files.contains(args.hash) && _mgr->_files[args.hash].contains(args.offset)) {
auto block = _mgr->_files[args.hash][args.offset].file_block;
if (block->tablet_id() == 0) {
block->set_tablet_id(args.ctx.tablet_id);
}
if (handle_already_loaded_block(_mgr, args.hash, args.offset, args.size,
args.ctx.tablet_id, cache_lock)) {
return;
}
_mgr->add_cell(args.hash, args.ctx, args.offset, args.size,
Expand Down Expand Up @@ -920,7 +942,10 @@ void FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, cons
context_original.cache_type = static_cast<FileCacheType>(block_meta->type);
context_original.tablet_id = key.meta.tablet_id;

if (!mgr->_files.contains(key.hash) || !mgr->_files[key.hash].contains(key.offset)) {
if (handle_already_loaded_block(mgr, key.hash, key.offset, block_meta->size, key.meta.tablet_id,
cache_lock)) {
return;
} else {
mgr->add_cell(key.hash, context_original, key.offset, block_meta->size,
FileBlock::State::DOWNLOADED, cache_lock);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/cache/fs_file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ class FSFileCacheStorage : public FileCacheStorage {

void load_cache_info_into_memory(BlockFileCache* _mgr) const;

bool handle_already_loaded_block(BlockFileCache* mgr, const UInt128Wrapper& hash, size_t offset,
size_t new_size, int64_t tablet_id,
std::lock_guard<std::mutex>& cache_lock) const;

private:
// Helper function to count files in cache directory using statfs
size_t estimate_file_count_from_statfs() const;
Expand Down
9 changes: 9 additions & 0 deletions be/src/io/cache/lru_queue_recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) {
}
break;
}
case CacheLRULogType::RESIZE: {
auto it = shadow_queue.get(log->hash, log->offset, lru_log_lock);
if (it != std::list<LRUQueue::FileKeyAndOffset>::iterator()) {
shadow_queue.resize(it, log->size, lru_log_lock);
} else {
LOG(WARNING) << "RESIZE failed, doesn't exist in shadow queue";
}
break;
}
default:
LOG(WARNING) << "Unknown CacheLRULogType: " << static_cast<int>(log->type);
break;
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/lru_queue_recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ enum class CacheLRULogType {
ADD = 0, // all of the integer types
REMOVE = 1,
MOVETOBACK = 2,
INVALID = 3,
RESIZE = 3,
INVALID = 4,
};

struct CacheLRULog {
Expand Down
8 changes: 4 additions & 4 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ class RuntimeProfile {
int64_t level = 2, int64_t condition = 0, int64_t value = 0)
: Counter(type, value, level),
_condition(condition),
_value(value),
_stored_value(value),
_condition_func(condition_func) {}

Counter* clone() const override {
Expand All @@ -405,21 +405,21 @@ class RuntimeProfile {

int64_t value() const override {
std::lock_guard<std::mutex> l(_mutex);
return _value;
return _stored_value;
}

void conditional_update(int64_t c, int64_t v) {
std::lock_guard<std::mutex> l(_mutex);
if (_condition_func(_condition, c)) {
_value = v;
_stored_value = v;
_condition = c;
}
}

private:
mutable std::mutex _mutex;
int64_t _condition;
int64_t _value;
int64_t _stored_value;
ConditionCounterFunction _condition_func;
};

Expand Down
69 changes: 69 additions & 0 deletions be/test/io/cache/block_file_cache_test_meta_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,75 @@ TEST_F(BlockFileCacheTest, clear_retains_meta_directory_and_clears_meta_entries)
}
}

TEST_F(BlockFileCacheTest, handle_already_loaded_block_updates_size_and_tablet) {
config::enable_evict_file_cache_in_advance = false;
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
fs::create_directories(cache_base_path);

io::FileCacheSettings settings;
settings.ttl_queue_size = 5000000;
settings.ttl_queue_elements = 50000;
settings.query_queue_size = 5000000;
settings.query_queue_elements = 50000;
settings.index_queue_size = 5000000;
settings.index_queue_elements = 50000;
settings.disposable_queue_size = 5000000;
settings.disposable_queue_elements = 50000;
settings.capacity = 20000000;
settings.max_file_block_size = 100000;
settings.max_query_cache_size = 30;

io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; ++i) {
if (cache.get_async_open_success()) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
ASSERT_TRUE(cache.get_async_open_success());

io::CacheContext context;
ReadStatistics rstats;
context.stats = &rstats;
context.cache_type = io::FileCacheType::NORMAL;
context.query_id.hi = 11;
context.query_id.lo = 12;
context.tablet_id = 0;
auto key = io::BlockFileCache::hash("sync_cached_block_meta_key");

constexpr size_t kOriginalSize = 100000;
auto holder = cache.get_or_set(key, 0, kOriginalSize, context);
auto blocks = fromHolder(holder);
ASSERT_EQ(blocks.size(), 1);
ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
download(blocks[0], kOriginalSize);
blocks.clear();

auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(cache._storage.get());
ASSERT_NE(fs_storage, nullptr) << "Expected FSFileCacheStorage but got different storage type";

constexpr size_t kNewSize = 2 * kOriginalSize;
constexpr int64_t kTabletId = 4242;
bool handled = false;
{
SCOPED_CACHE_LOCK(cache._mutex, (&cache));
handled = fs_storage->handle_already_loaded_block(&cache, key, 0, kNewSize, kTabletId,
cache_lock);
}

ASSERT_TRUE(handled);
auto& cell = cache._files[key][0];
EXPECT_EQ(cell.file_block->tablet_id(), kTabletId);
EXPECT_EQ(cache._cur_cache_size, kNewSize);
EXPECT_EQ(cache._normal_queue.get_capacity_unsafe(), kNewSize);

if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
}
TEST_F(BlockFileCacheTest, estimate_file_count_skips_removed_directory) {
std::string test_dir = cache_base_path + "/estimate_file_count_removed_dir";
if (fs::exists(test_dir)) {
Expand Down
11 changes: 11 additions & 0 deletions be/test/io/cache/lru_queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,14 @@ TEST_F(LRUQueueTest, SameElementsDifferentOrder) {

EXPECT_EQ(queue1->levenshtein_distance_from(*queue2, lock), 2);
}

TEST_F(LRUQueueTest, ResizeUpdatesCacheSize) {
std::mutex mutex;
std::lock_guard lock(mutex);

auto iter = queue1->add(UInt128Wrapper(123), 0, 1024, lock);
EXPECT_EQ(queue1->get_capacity(lock), 1024);

queue1->resize(iter, 2048, lock);
EXPECT_EQ(queue1->get_capacity(lock), 2048);
}
Loading