Skip to content

Commit b3c82cb

Browse files
freemandealerdataroaring
authored andcommitted
[fix](filecache) add check and exception handle for empty block file
current code didn't check and handle cases when block file is fail to download (empty file). add check and handler for that. Signed-off-by: zhengyu <zhangzhengyu@selectdb.com>
1 parent 1b81208 commit b3c82cb

File tree

3 files changed

+147
-2
lines changed

3 files changed

+147
-2
lines changed

be/src/io/cache/file_block.cpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,12 @@ void FileBlock::reset_downloader_impl(std::lock_guard<std::mutex>& block_lock) {
113113

114114
Status FileBlock::set_downloaded(std::lock_guard<std::mutex>& /* block_lock */) {
115115
DCHECK(_download_state != State::DOWNLOADED);
116-
DCHECK_NE(_downloaded_size, 0);
116+
if (_downloaded_size == 0) {
117+
_download_state = State::EMPTY;
118+
_downloader_id = 0;
119+
return Status::InternalError("Try to set empty block {} as downloaded",
120+
_block_range.to_string());
121+
}
117122
Status status = _mgr->_storage->finalize(_key, this->_block_range.size());
118123
if (status.ok()) [[likely]] {
119124
_download_state = State::DOWNLOADED;
@@ -147,7 +152,15 @@ Status FileBlock::append(Slice data) {
147152
}
148153

149154
Status FileBlock::finalize() {
150-
if (_downloaded_size != 0 && _downloaded_size != _block_range.size()) {
155+
if (_downloaded_size == 0) {
156+
std::lock_guard block_lock(_mutex);
157+
_download_state = State::EMPTY;
158+
_downloader_id = 0;
159+
_cv.notify_all();
160+
return Status::InternalError("Try to finalize an empty file block {}",
161+
_block_range.to_string());
162+
}
163+
if (_downloaded_size != _block_range.size()) {
151164
SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
152165
size_t old_size = _block_range.size();
153166
_block_range.right = _block_range.left + _downloaded_size - 1;

be/src/io/cache/file_block.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class FileBlock {
4444
friend class BlockFileCache;
4545
friend class CachedRemoteFileReader;
4646
friend struct FileBlockCell;
47+
friend class FileBlockTestAccessor;
4748

4849
public:
4950
enum class State {

be/test/io/cache/block_file_cache_test.cpp

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,22 @@
2323

2424
namespace doris::io {
2525

26+
class FileBlockTestAccessor {
27+
public:
28+
static void set_state(FileBlock& block, FileBlock::State state) {
29+
block._download_state = state;
30+
}
31+
static void set_downloader_id(FileBlock& block, uint64_t id) { block._downloader_id = id; }
32+
static void set_downloaded_size(FileBlock& block, size_t size) {
33+
block._downloaded_size = size;
34+
}
35+
36+
static Status call_set_downloaded(FileBlock& block) {
37+
std::lock_guard<std::mutex> lock(block._mutex);
38+
return block.set_downloaded(lock);
39+
}
40+
};
41+
2642
fs::path caches_dir = fs::current_path() / "lru_cache_test";
2743
std::string cache_base_path = caches_dir / "cache1" / "";
2844
std::string tmp_file = caches_dir / "tmp_file";
@@ -7671,4 +7687,119 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_bytes_check) {
76717687
FileCacheFactory::instance()->_capacity = 0;
76727688
}
76737689

7690+
TEST_F(BlockFileCacheTest, finalize_empty_block) {
7691+
std::string my_cache_path = caches_dir / "empty_block_test" / "";
7692+
if (fs::exists(my_cache_path)) {
7693+
fs::remove_all(my_cache_path);
7694+
}
7695+
io::FileCacheSettings settings;
7696+
settings.capacity = 100;
7697+
settings.max_file_block_size = 100;
7698+
io::BlockFileCache mgr(my_cache_path, settings);
7699+
ASSERT_TRUE(mgr.initialize().ok());
7700+
7701+
for (int i = 0; i < 100; i++) {
7702+
if (mgr.get_async_open_success()) {
7703+
break;
7704+
};
7705+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
7706+
}
7707+
7708+
auto key = io::BlockFileCache::hash("empty_block_test");
7709+
io::CacheContext context;
7710+
ReadStatistics rstats;
7711+
context.stats = &rstats;
7712+
context.cache_type = io::FileCacheType::NORMAL;
7713+
7714+
{
7715+
auto holder = mgr.get_or_set(key, 0, 10, context);
7716+
auto blocks = fromHolder(holder);
7717+
ASSERT_EQ(blocks.size(), 1);
7718+
auto block = blocks[0];
7719+
ASSERT_EQ(block->state(), io::FileBlock::State::EMPTY);
7720+
7721+
ASSERT_EQ(block->get_or_set_downloader(), io::FileBlock::get_caller_id());
7722+
ASSERT_EQ(block->state(), io::FileBlock::State::DOWNLOADING);
7723+
7724+
// Call finalize without calling append()
7725+
Status st = block->finalize();
7726+
ASSERT_FALSE(st.ok());
7727+
ASSERT_EQ(block->state(), io::FileBlock::State::EMPTY);
7728+
ASSERT_EQ(block->get_downloader(), 0);
7729+
}
7730+
if (fs::exists(my_cache_path)) {
7731+
fs::remove_all(my_cache_path);
7732+
}
7733+
}
7734+
7735+
TEST_F(BlockFileCacheTest, finalize_partial_block) {
7736+
std::string my_cache_path = caches_dir / "partial_block_test" / "";
7737+
if (fs::exists(my_cache_path)) {
7738+
fs::remove_all(my_cache_path);
7739+
}
7740+
io::FileCacheSettings settings;
7741+
settings.capacity = 100;
7742+
settings.max_file_block_size = 100;
7743+
io::BlockFileCache mgr(my_cache_path, settings);
7744+
ASSERT_TRUE(mgr.initialize().ok());
7745+
7746+
for (int i = 0; i < 100; i++) {
7747+
if (mgr.get_async_open_success()) {
7748+
break;
7749+
};
7750+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
7751+
}
7752+
7753+
auto key = io::BlockFileCache::hash("partial_block_test");
7754+
io::CacheContext context;
7755+
ReadStatistics rstats;
7756+
context.stats = &rstats;
7757+
context.cache_type = io::FileCacheType::NORMAL;
7758+
7759+
{
7760+
auto holder = mgr.get_or_set(key, 0, 10, context);
7761+
auto blocks = fromHolder(holder);
7762+
ASSERT_EQ(blocks.size(), 1);
7763+
auto block = blocks[0];
7764+
ASSERT_EQ(block->get_or_set_downloader(), io::FileBlock::get_caller_id());
7765+
7766+
std::string data(5, '0');
7767+
ASSERT_TRUE(block->append(Slice(data.data(), data.size())).ok());
7768+
7769+
// Finalize a block that only has 5 bytes out of 10
7770+
Status st = block->finalize();
7771+
ASSERT_TRUE(st.ok());
7772+
ASSERT_EQ(block->state(), io::FileBlock::State::DOWNLOADED);
7773+
ASSERT_EQ(block->range().size(), 5);
7774+
ASSERT_EQ(block->range().right, 4);
7775+
}
7776+
7777+
// Verify it was shrunk in the cache
7778+
ASSERT_EQ(mgr.get_used_cache_size(io::FileCacheType::NORMAL), 5);
7779+
7780+
if (fs::exists(my_cache_path)) {
7781+
fs::remove_all(my_cache_path);
7782+
}
7783+
}
7784+
7785+
TEST_F(BlockFileCacheTest, set_downloaded_empty_block_branch) {
7786+
FileCacheKey key;
7787+
key.hash = io::BlockFileCache::hash("set_downloaded_empty_block_branch");
7788+
key.offset = 0;
7789+
key.meta.type = io::FileCacheType::NORMAL;
7790+
key.meta.expiration_time = 0;
7791+
key.meta.tablet_id = 0;
7792+
7793+
// mgr is intentionally nullptr: this branch returns before touching storage.
7794+
io::FileBlock block(key, 10, nullptr, io::FileBlock::State::EMPTY);
7795+
FileBlockTestAccessor::set_state(block, io::FileBlock::State::DOWNLOADING);
7796+
FileBlockTestAccessor::set_downloader_id(block, 123);
7797+
FileBlockTestAccessor::set_downloaded_size(block, 0);
7798+
7799+
Status st = FileBlockTestAccessor::call_set_downloaded(block);
7800+
ASSERT_FALSE(st.ok());
7801+
ASSERT_EQ(block.state(), io::FileBlock::State::EMPTY);
7802+
ASSERT_EQ(block.get_downloader(), 0);
7803+
}
7804+
76747805
} // namespace doris::io

0 commit comments

Comments
 (0)