Skip to content

Commit 26750be

Browse files
authored
[refact](inverted index) refact compound idx writer (#59219)
### What problem does this PR solve? This PR refactors the compound index writer for the inverted index V2 format to optimize performance, particularly for cloud storage systems like S3. The main improvement is bypassing unnecessary directory operations by directly creating the output stream using an existing file writer. Key changes: Added a static factory method FSIndexOutputV2::create() for direct instantiation without requiring a Directory object Refactored IndexStorageFormatV2::create_output_stream() to return only an IndexOutput instead of a Directory-IndexOutput pair Removed the cloud mode check in DorisFSDirectoryFactory::getDirectory() to simplify code (directory creation is safe for all file systems)
1 parent 3f2a712 commit 26750be

File tree

5 files changed

+72
-63
lines changed

5 files changed

+72
-63
lines changed

be/src/olap/rowset/segment_v2/index_storage_format_v2.cpp

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ IndexStorageFormatV2::IndexStorageFormatV2(IndexFileWriter* index_file_writer)
3939
: IndexStorageFormat(index_file_writer) {}
4040

4141
Status IndexStorageFormatV2::write() {
42-
std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir = nullptr;
4342
std::unique_ptr<lucene::store::IndexOutput> compound_file_output = nullptr;
4443
ErrorContext error_context;
4544
try {
@@ -48,11 +47,13 @@ Status IndexStorageFormatV2::write() {
4847
// Prepare file metadata
4948
auto file_metadata = prepare_file_metadata(current_offset);
5049

51-
// Create output stream
52-
auto result = create_output_stream();
53-
out_dir = std::move(result.first);
54-
compound_file_output = std::move(result.second);
55-
VLOG_DEBUG << fmt::format("Output compound index file to streams: {}", out_dir->toString());
50+
// Create output stream directly without directory operations.
51+
// This is important for cloud storage (like S3) where directory operations are not
52+
// supported or unnecessary.
53+
compound_file_output = create_output_stream();
54+
auto index_path = InvertedIndexDescriptor::get_index_file_path_v2(
55+
_index_file_writer->_index_path_prefix);
56+
VLOG_DEBUG << fmt::format("Output compound index file to: {}", index_path);
5657

5758
// Write version and number of indices
5859
write_version_and_indices_count(compound_file_output.get());
@@ -75,10 +76,7 @@ Status IndexStorageFormatV2::write() {
7576
error_context.err_msg.append(err.what());
7677
LOG(ERROR) << error_context.err_msg;
7778
}
78-
FINALLY({
79-
FINALLY_CLOSE(compound_file_output);
80-
FINALLY_CLOSE(out_dir);
81-
})
79+
FINALLY({ FINALLY_CLOSE(compound_file_output); })
8280

8381
return Status::OK();
8482
}
@@ -177,21 +175,16 @@ std::vector<FileMetadata> IndexStorageFormatV2::prepare_file_metadata(int64_t& c
177175
return file_metadata;
178176
}
179177

180-
std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
181-
std::unique_ptr<lucene::store::IndexOutput>>
182-
IndexStorageFormatV2::create_output_stream() {
183-
io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(
184-
_index_file_writer->_index_path_prefix)};
185-
186-
auto* out_dir = DorisFSDirectoryFactory::getDirectory(_index_file_writer->_fs,
187-
index_path.parent_path().c_str());
188-
out_dir->set_file_writer_opts(_index_file_writer->_opts);
189-
std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir_ptr(out_dir);
190-
178+
std::unique_ptr<lucene::store::IndexOutput> IndexStorageFormatV2::create_output_stream() {
179+
// For V2 format, we create the output stream directly using the file writer,
180+
// bypassing the directory layer entirely. This optimization is especially important
181+
// for cloud storage (like S3) where:
182+
// 1. Directory operations (exists, create_directory) are unnecessary overhead
183+
// 2. S3 doesn't have a real directory concept - directories are just key prefixes
184+
// 3. The file writer is already created and ready to use
191185
DCHECK(_index_file_writer->_idx_v2_writer != nullptr)
192186
<< "inverted index file writer v2 is nullptr";
193-
auto compound_file_output = out_dir->createOutputV2(_index_file_writer->_idx_v2_writer.get());
194-
return {std::move(out_dir_ptr), std::move(compound_file_output)};
187+
return DorisFSDirectory::FSIndexOutputV2::create(_index_file_writer->_idx_v2_writer.get());
195188
}
196189

197190
void IndexStorageFormatV2::write_version_and_indices_count(lucene::store::IndexOutput* output) {

be/src/olap/rowset/segment_v2/index_storage_format_v2.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ class IndexStorageFormatV2 : public IndexStorageFormat {
4343
private:
4444
int64_t header_length();
4545
std::vector<FileMetadata> prepare_file_metadata(int64_t& current_offset);
46-
virtual std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
47-
std::unique_ptr<lucene::store::IndexOutput>>
48-
create_output_stream();
46+
// Creates the output stream for writing the compound file.
47+
// For V2 format, we directly create FSIndexOutputV2 using the file writer,
48+
// avoiding unnecessary directory operations (important for cloud storage like S3).
49+
virtual std::unique_ptr<lucene::store::IndexOutput> create_output_stream();
4950
void write_version_and_indices_count(lucene::store::IndexOutput* output);
5051
virtual void write_index_headers_and_metadata(lucene::store::IndexOutput* output,
5152
const std::vector<FileMetadata>& file_metadata);

be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,26 @@ int64_t DorisFSDirectory::FSIndexOutputV2::length() const {
454454
return _index_v2_file_writer->bytes_appended();
455455
}
456456

457+
std::unique_ptr<lucene::store::IndexOutput> DorisFSDirectory::FSIndexOutputV2::create(
458+
io::FileWriter* file_writer) {
459+
auto ret = std::make_unique<FSIndexOutputV2>();
460+
ErrorContext error_context;
461+
try {
462+
ret->init(file_writer);
463+
} catch (CLuceneError& err) {
464+
error_context.eptr = std::current_exception();
465+
error_context.err_msg.append("FSIndexOutputV2::create init error: ");
466+
error_context.err_msg.append(err.what());
467+
LOG(ERROR) << error_context.err_msg;
468+
}
469+
FINALLY_EXCEPTION({
470+
if (error_context.eptr) {
471+
FINALLY_CLOSE(ret);
472+
}
473+
})
474+
return ret;
475+
}
476+
457477
DorisFSDirectory::DorisFSDirectory() {
458478
filemode = 0644;
459479
this->lockFactory = nullptr;
@@ -905,28 +925,25 @@ DorisFSDirectory* DorisFSDirectoryFactory::getDirectory(const io::FileSystemSPtr
905925
if (config::inverted_index_ram_dir_enable && can_use_ram_dir) {
906926
dir = _CLNEW DorisRAMFSDirectory();
907927
} else {
908-
// cloud mode does not need to create directory
909-
if (!config::is_cloud_mode()) {
910-
bool exists = false;
911-
auto st = _fs->exists(file, &exists);
912-
DBUG_EXECUTE_IF("DorisFSDirectoryFactory::getDirectory_exists_status_is_not_ok", {
913-
st = Status::Error<ErrorCode::INTERNAL_ERROR>(
914-
"debug point: "
915-
"DorisFSDirectoryFactory::getDirectory_exists_status_is_not_ok");
916-
})
917-
LOG_AND_THROW_IF_ERROR(st, "Get directory exists IO error");
918-
if (!exists) {
919-
st = _fs->create_directory(file);
920-
DBUG_EXECUTE_IF(
921-
"DorisFSDirectoryFactory::getDirectory_create_directory_status_is_not_ok", {
922-
st = Status::Error<ErrorCode::INTERNAL_ERROR>(
923-
"debug point: "
924-
"DorisFSDirectoryFactory::getDirectory_create_directory_status_"
925-
"is_"
926-
"not_ok");
927-
})
928-
LOG_AND_THROW_IF_ERROR(st, "Get directory create directory IO error");
929-
}
928+
bool exists = false;
929+
auto st = _fs->exists(file, &exists);
930+
DBUG_EXECUTE_IF("DorisFSDirectoryFactory::getDirectory_exists_status_is_not_ok", {
931+
st = Status::Error<ErrorCode::INTERNAL_ERROR>(
932+
"debug point: "
933+
"DorisFSDirectoryFactory::getDirectory_exists_status_is_not_ok");
934+
})
935+
LOG_AND_THROW_IF_ERROR(st, "Get directory exists IO error");
936+
if (!exists) {
937+
st = _fs->create_directory(file);
938+
DBUG_EXECUTE_IF(
939+
"DorisFSDirectoryFactory::getDirectory_create_directory_status_is_not_ok", {
940+
st = Status::Error<ErrorCode::INTERNAL_ERROR>(
941+
"debug point: "
942+
"DorisFSDirectoryFactory::getDirectory_create_directory_status_"
943+
"is_"
944+
"not_ok");
945+
})
946+
LOG_AND_THROW_IF_ERROR(st, "Get directory create directory IO error");
930947
}
931948
dir = _CLNEW DorisFSDirectory();
932949
}

be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,11 @@ class DorisFSDirectory::FSIndexOutputV2 : public lucene::store::BufferedIndexOut
246246
~FSIndexOutputV2() override;
247247
void close() override;
248248
int64_t length() const override;
249+
250+
// Static factory method to create FSIndexOutputV2 directly without Directory object.
251+
// This is useful for compound file creation where we already have a FileWriter
252+
// and don't need directory operations (especially for cloud storage like S3).
253+
static std::unique_ptr<lucene::store::IndexOutput> create(io::FileWriter* file_writer);
249254
};
250255

251256
/**

be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -692,9 +692,8 @@ class IndexStorageFormatV2MockCreateOutputStream : public IndexStorageFormatV2 {
692692
IndexStorageFormatV2MockCreateOutputStream(IndexFileWriter* index_file_writer)
693693
: IndexStorageFormatV2(index_file_writer) {}
694694

695-
MOCK_METHOD((std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
696-
std::unique_ptr<lucene::store::IndexOutput>>),
697-
create_output_stream, (), (override));
695+
MOCK_METHOD((std::unique_ptr<lucene::store::IndexOutput>), create_output_stream, (),
696+
(override));
698697
};
699698

700699
class IndexFileWriterMockCreateOutputStreamV1 : public IndexFileWriter {
@@ -808,12 +807,9 @@ TEST_F(IndexFileWriterTest, WriteV2OutputTest) {
808807
EXPECT_CALL(
809808
*(IndexStorageFormatV2MockCreateOutputStream*)writer_mock._index_storage_format.get(),
810809
create_output_stream())
811-
.WillOnce(::testing::Invoke(
812-
[&]() -> std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
813-
std::unique_ptr<lucene::store::IndexOutput>> {
814-
return std::make_pair(std::move(out_dir_ptr),
815-
std::move(compound_file_output));
816-
}));
810+
.WillOnce(::testing::Invoke([&]() -> std::unique_ptr<lucene::store::IndexOutput> {
811+
return std::move(compound_file_output);
812+
}));
817813

818814
int64_t index_id = 1;
819815
std::string index_suffix = "suffix1";
@@ -871,12 +867,9 @@ TEST_F(IndexFileWriterTest, WriteV2OutputCloseErrorTest) {
871867
EXPECT_CALL(
872868
*(IndexStorageFormatV2MockCreateOutputStream*)writer_mock._index_storage_format.get(),
873869
create_output_stream())
874-
.WillOnce(::testing::Invoke(
875-
[&]() -> std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
876-
std::unique_ptr<lucene::store::IndexOutput>> {
877-
return std::make_pair(std::move(out_dir_ptr),
878-
std::move(compound_file_output));
879-
}));
870+
.WillOnce(::testing::Invoke([&]() -> std::unique_ptr<lucene::store::IndexOutput> {
871+
return std::move(compound_file_output);
872+
}));
880873

881874
int64_t index_id = 1;
882875
std::string index_suffix = "suffix1";

0 commit comments

Comments
 (0)