Skip to content

Commit d336eed

Browse files
[fix](filecache) load meta exit with file NOT_FOUND exception (#59311)
When loading cache block file into memory, the iteration will fail with exceptions caused by concurrency. This commit skips the error and continues with the following iteration.
1 parent 8e12694 commit d336eed

File tree

2 files changed

+201
-6
lines changed

2 files changed

+201
-6
lines changed

be/src/io/cache/fs_file_cache_storage.cpp

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <filesystem>
2727
#include <mutex>
2828
#include <system_error>
29+
#include <vector>
2930

3031
#include "common/logging.h"
3132
#include "common/status.h"
@@ -877,8 +878,8 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
877878
// If the difference is more than threshold, load from filesystem as well
878879
if (estimated_file_count > 100) {
879880
double difference_ratio =
880-
static_cast<double>(estimated_file_count) -
881-
static_cast<double>(db_block_count) / static_cast<double>(estimated_file_count);
881+
(static_cast<double>(estimated_file_count) - static_cast<double>(db_block_count)) /
882+
static_cast<double>(estimated_file_count);
882883

883884
if (difference_ratio > config::file_cache_meta_store_vs_file_system_diff_num_threshold) {
884885
LOG(WARNING) << "Significant difference between DB blocks (" << db_block_count
@@ -983,13 +984,53 @@ size_t FSFileCacheStorage::estimate_file_count_from_statfs() const {
983984
// Get total size of cache directory to estimate file count
984985
std::error_code ec;
985986
uintmax_t total_size = 0;
986-
for (const auto& entry : std::filesystem::recursive_directory_iterator(_cache_base_path, ec)) {
987+
std::vector<std::filesystem::path> pending_dirs {std::filesystem::path(_cache_base_path)};
988+
while (!pending_dirs.empty()) {
989+
auto current_dir = pending_dirs.back();
990+
pending_dirs.pop_back();
991+
992+
std::filesystem::directory_iterator it(current_dir, ec);
987993
if (ec) {
988-
LOG(WARNING) << "Error accessing directory entry: " << ec.message();
994+
LOG(WARNING) << "Failed to list directory while estimating file count, dir="
995+
<< current_dir << ", err=" << ec.message();
996+
ec.clear();
989997
continue;
990998
}
991-
if (entry.is_regular_file()) {
992-
total_size += entry.file_size();
999+
1000+
for (; it != std::filesystem::directory_iterator(); ++it) {
1001+
std::error_code status_ec;
1002+
auto entry_status = it->symlink_status(status_ec);
1003+
TEST_SYNC_POINT_CALLBACK(
1004+
"FSFileCacheStorage::estimate_file_count_from_statfs::AfterEntryStatus",
1005+
&status_ec);
1006+
if (status_ec) {
1007+
LOG(WARNING) << "Failed to stat entry while estimating file count, path="
1008+
<< it->path() << ", err=" << status_ec.message();
1009+
continue;
1010+
}
1011+
1012+
if (std::filesystem::is_directory(entry_status)) {
1013+
auto next_dir = it->path();
1014+
TEST_SYNC_POINT_CALLBACK(
1015+
"FSFileCacheStorage::estimate_file_count_from_statfs::OnDirectory",
1016+
&next_dir);
1017+
pending_dirs.emplace_back(next_dir);
1018+
continue;
1019+
}
1020+
1021+
if (std::filesystem::is_regular_file(entry_status)) {
1022+
std::error_code size_ec;
1023+
auto file_size = it->file_size(size_ec);
1024+
TEST_SYNC_POINT_CALLBACK(
1025+
"FSFileCacheStorage::estimate_file_count_from_statfs::AfterFileSize",
1026+
&size_ec);
1027+
if (size_ec) {
1028+
LOG(WARNING) << "Failed to get file size while estimating file count, path="
1029+
<< it->path() << ", err=" << size_ec.message();
1030+
continue;
1031+
}
1032+
total_size += file_size;
1033+
}
9931034
}
9941035
}
9951036

be/test/io/cache/block_file_cache_test_meta_store.cpp

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,20 @@
1818
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/tests/gtest_lru_file_cache.cpp
1919
// and modified by Doris
2020

21+
#if defined(__clang__)
22+
#pragma clang diagnostic push
23+
#pragma clang diagnostic ignored "-Wkeyword-macro"
24+
#endif
25+
26+
#define private public
27+
#define protected public
2128
#include "block_file_cache_test_common.h"
29+
#undef private
30+
#undef protected
31+
32+
#if defined(__clang__)
33+
#pragma clang diagnostic pop
34+
#endif
2235

2336
namespace doris::io {
2437

@@ -498,6 +511,147 @@ TEST_F(BlockFileCacheTest, clear_retains_meta_directory_and_clears_meta_entries)
498511
}
499512
}
500513

514+
TEST_F(BlockFileCacheTest, estimate_file_count_skips_removed_directory) {
515+
std::string test_dir = cache_base_path + "/estimate_file_count_removed_dir";
516+
if (fs::exists(test_dir)) {
517+
fs::remove_all(test_dir);
518+
}
519+
auto keep_dir = fs::path(test_dir) / "keep";
520+
auto remove_dir = fs::path(test_dir) / "remove";
521+
fs::create_directories(keep_dir);
522+
fs::create_directories(remove_dir);
523+
524+
auto keep_file = keep_dir / "data.bin";
525+
std::string one_mb(1024 * 1024, 'd');
526+
{
527+
std::ofstream ofs(keep_file, std::ios::binary);
528+
ASSERT_TRUE(ofs.good());
529+
for (int i = 0; i < 3; ++i) {
530+
ofs.write(one_mb.data(), one_mb.size());
531+
ASSERT_TRUE(ofs.good());
532+
}
533+
}
534+
535+
FSFileCacheStorage storage;
536+
storage._cache_base_path = test_dir;
537+
538+
const std::string sync_point_name =
539+
"FSFileCacheStorage::estimate_file_count_from_statfs::OnDirectory";
540+
auto* sync_point = doris::SyncPoint::get_instance();
541+
doris::SyncPoint::CallbackGuard guard(sync_point_name);
542+
sync_point->set_call_back(
543+
sync_point_name,
544+
[remove_dir](std::vector<std::any>&& args) {
545+
auto* path = doris::try_any_cast<std::filesystem::path*>(args[0]);
546+
if (*path == remove_dir) {
547+
fs::remove_all(remove_dir);
548+
}
549+
},
550+
&guard);
551+
sync_point->enable_processing();
552+
553+
size_t estimated_files = storage.estimate_file_count_from_statfs();
554+
555+
sync_point->disable_processing();
556+
557+
ASSERT_EQ(3, estimated_files);
558+
ASSERT_FALSE(fs::exists(remove_dir));
559+
560+
if (fs::exists(test_dir)) {
561+
fs::remove_all(test_dir);
562+
}
563+
}
564+
565+
TEST_F(BlockFileCacheTest, estimate_file_count_handles_stat_failure) {
566+
std::string test_dir = cache_base_path + "/estimate_file_count_stat_failure";
567+
if (fs::exists(test_dir)) {
568+
fs::remove_all(test_dir);
569+
}
570+
fs::create_directories(test_dir);
571+
572+
auto data_file = fs::path(test_dir) / "data.bin";
573+
std::string one_mb(1024 * 1024, 'x');
574+
{
575+
std::ofstream ofs(data_file, std::ios::binary);
576+
ASSERT_TRUE(ofs.good());
577+
ofs.write(one_mb.data(), one_mb.size());
578+
ASSERT_TRUE(ofs.good());
579+
}
580+
581+
FSFileCacheStorage storage;
582+
storage._cache_base_path = test_dir;
583+
584+
const std::string sync_point_name =
585+
"FSFileCacheStorage::estimate_file_count_from_statfs::AfterEntryStatus";
586+
auto* sync_point = doris::SyncPoint::get_instance();
587+
doris::SyncPoint::CallbackGuard guard(sync_point_name);
588+
sync_point->set_call_back(
589+
sync_point_name,
590+
[](std::vector<std::any>&& args) {
591+
auto* ec = doris::try_any_cast<std::error_code*>(args[0]);
592+
if (ec != nullptr) {
593+
*ec = std::make_error_code(std::errc::io_error);
594+
}
595+
},
596+
&guard);
597+
sync_point->enable_processing();
598+
599+
size_t estimated_files = storage.estimate_file_count_from_statfs();
600+
601+
sync_point->disable_processing();
602+
603+
ASSERT_EQ(0, estimated_files);
604+
605+
if (fs::exists(test_dir)) {
606+
fs::remove_all(test_dir);
607+
}
608+
}
609+
610+
TEST_F(BlockFileCacheTest, estimate_file_count_handles_file_size_failure) {
611+
std::string test_dir = cache_base_path + "/estimate_file_count_file_size_failure";
612+
if (fs::exists(test_dir)) {
613+
fs::remove_all(test_dir);
614+
}
615+
fs::create_directories(test_dir);
616+
617+
auto data_file = fs::path(test_dir) / "data.bin";
618+
std::string one_mb(1024 * 1024, 'x');
619+
{
620+
std::ofstream ofs(data_file, std::ios::binary);
621+
ASSERT_TRUE(ofs.good());
622+
ofs.write(one_mb.data(), one_mb.size());
623+
ASSERT_TRUE(ofs.good());
624+
}
625+
626+
FSFileCacheStorage storage;
627+
storage._cache_base_path = test_dir;
628+
629+
const std::string sync_point_name =
630+
"FSFileCacheStorage::estimate_file_count_from_statfs::AfterFileSize";
631+
auto* sync_point = doris::SyncPoint::get_instance();
632+
doris::SyncPoint::CallbackGuard guard(sync_point_name);
633+
sync_point->set_call_back(
634+
sync_point_name,
635+
[](std::vector<std::any>&& args) {
636+
auto* ec = doris::try_any_cast<std::error_code*>(args[0]);
637+
if (ec != nullptr) {
638+
*ec = std::make_error_code(std::errc::io_error);
639+
}
640+
},
641+
&guard);
642+
sync_point->enable_processing();
643+
644+
size_t estimated_files = storage.estimate_file_count_from_statfs();
645+
646+
sync_point->disable_processing();
647+
648+
ASSERT_EQ(0, estimated_files);
649+
650+
if (fs::exists(test_dir)) {
651+
fs::remove_all(test_dir);
652+
}
653+
}
654+
501655
//TODO(zhengyu): check lazy load
502656
//TODO(zhengyu): check version2 start
503657
//TODO(zhengyu): check version2 version3 mixed start

0 commit comments

Comments
 (0)