Skip to content

Commit fea57a0

Browse files
authored
branch-4.0: [refact](inverted index) refact compound idx writer #59219 (#59364)
cherry pick from #59219
1 parent 9cec72e commit fea57a0

File tree

5 files changed

+53
-41
lines changed

5 files changed

+53
-41
lines changed

be/src/olap/rowset/segment_v2/index_storage_format_v2.cpp

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ IndexStorageFormatV2::IndexStorageFormatV2(IndexFileWriter* index_file_writer)
3939
: IndexStorageFormat(index_file_writer) {}
4040

4141
Status IndexStorageFormatV2::write() {
42-
std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir = nullptr;
4342
std::unique_ptr<lucene::store::IndexOutput> compound_file_output = nullptr;
4443
ErrorContext error_context;
4544
try {
@@ -48,11 +47,13 @@ Status IndexStorageFormatV2::write() {
4847
// Prepare file metadata
4948
auto file_metadata = prepare_file_metadata(current_offset);
5049

51-
// Create output stream
52-
auto result = create_output_stream();
53-
out_dir = std::move(result.first);
54-
compound_file_output = std::move(result.second);
55-
VLOG_DEBUG << fmt::format("Output compound index file to streams: {}", out_dir->toString());
50+
// Create output stream directly without directory operations.
51+
// This is important for cloud storage (like S3) where directory operations are not
52+
// supported or unnecessary.
53+
compound_file_output = create_output_stream();
54+
auto index_path = InvertedIndexDescriptor::get_index_file_path_v2(
55+
_index_file_writer->_index_path_prefix);
56+
VLOG_DEBUG << fmt::format("Output compound index file to: {}", index_path);
5657

5758
// Write version and number of indices
5859
write_version_and_indices_count(compound_file_output.get());
@@ -75,10 +76,7 @@ Status IndexStorageFormatV2::write() {
7576
error_context.err_msg.append(err.what());
7677
LOG(ERROR) << error_context.err_msg;
7778
}
78-
FINALLY({
79-
FINALLY_CLOSE(compound_file_output);
80-
FINALLY_CLOSE(out_dir);
81-
})
79+
FINALLY({ FINALLY_CLOSE(compound_file_output); })
8280

8381
return Status::OK();
8482
}
@@ -177,21 +175,16 @@ std::vector<FileMetadata> IndexStorageFormatV2::prepare_file_metadata(int64_t& c
177175
return file_metadata;
178176
}
179177

180-
std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
181-
std::unique_ptr<lucene::store::IndexOutput>>
182-
IndexStorageFormatV2::create_output_stream() {
183-
io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(
184-
_index_file_writer->_index_path_prefix)};
185-
186-
auto* out_dir = DorisFSDirectoryFactory::getDirectory(_index_file_writer->_fs,
187-
index_path.parent_path().c_str());
188-
out_dir->set_file_writer_opts(_index_file_writer->_opts);
189-
std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir_ptr(out_dir);
190-
178+
std::unique_ptr<lucene::store::IndexOutput> IndexStorageFormatV2::create_output_stream() {
179+
// For V2 format, we create the output stream directly using the file writer,
180+
// bypassing the directory layer entirely. This optimization is especially important
181+
// for cloud storage (like S3) where:
182+
// 1. Directory operations (exists, create_directory) are unnecessary overhead
183+
// 2. S3 doesn't have a real directory concept - directories are just key prefixes
184+
// 3. The file writer is already created and ready to use
191185
DCHECK(_index_file_writer->_idx_v2_writer != nullptr)
192186
<< "inverted index file writer v2 is nullptr";
193-
auto compound_file_output = out_dir->createOutputV2(_index_file_writer->_idx_v2_writer.get());
194-
return {std::move(out_dir_ptr), std::move(compound_file_output)};
187+
return DorisFSDirectory::FSIndexOutputV2::create(_index_file_writer->_idx_v2_writer.get());
195188
}
196189

197190
void IndexStorageFormatV2::write_version_and_indices_count(lucene::store::IndexOutput* output) {

be/src/olap/rowset/segment_v2/index_storage_format_v2.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ class IndexStorageFormatV2 : public IndexStorageFormat {
4343
private:
4444
int64_t header_length();
4545
std::vector<FileMetadata> prepare_file_metadata(int64_t& current_offset);
46-
virtual std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
47-
std::unique_ptr<lucene::store::IndexOutput>>
48-
create_output_stream();
46+
// Creates the output stream for writing the compound file.
47+
// For V2 format, we directly create FSIndexOutputV2 using the file writer,
48+
// avoiding unnecessary directory operations (important for cloud storage like S3).
49+
virtual std::unique_ptr<lucene::store::IndexOutput> create_output_stream();
4950
void write_version_and_indices_count(lucene::store::IndexOutput* output);
5051
virtual void write_index_headers_and_metadata(lucene::store::IndexOutput* output,
5152
const std::vector<FileMetadata>& file_metadata);

be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,26 @@ int64_t DorisFSDirectory::FSIndexOutputV2::length() const {
453453
return _index_v2_file_writer->bytes_appended();
454454
}
455455

456+
std::unique_ptr<lucene::store::IndexOutput> DorisFSDirectory::FSIndexOutputV2::create(
457+
io::FileWriter* file_writer) {
458+
auto ret = std::make_unique<FSIndexOutputV2>();
459+
ErrorContext error_context;
460+
try {
461+
ret->init(file_writer);
462+
} catch (CLuceneError& err) {
463+
error_context.eptr = std::current_exception();
464+
error_context.err_msg.append("FSIndexOutputV2::create init error: ");
465+
error_context.err_msg.append(err.what());
466+
LOG(ERROR) << error_context.err_msg;
467+
}
468+
FINALLY_EXCEPTION({
469+
if (error_context.eptr) {
470+
FINALLY_CLOSE(ret);
471+
}
472+
})
473+
return ret;
474+
}
475+
456476
DorisFSDirectory::DorisFSDirectory() {
457477
filemode = 0644;
458478
this->lockFactory = nullptr;

be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,11 @@ class DorisFSDirectory::FSIndexOutputV2 : public lucene::store::BufferedIndexOut
246246
~FSIndexOutputV2() override;
247247
void close() override;
248248
int64_t length() const override;
249+
250+
// Static factory method to create FSIndexOutputV2 directly without Directory object.
251+
// This is useful for compound file creation where we already have a FileWriter
252+
// and don't need directory operations (especially for cloud storage like S3).
253+
static std::unique_ptr<lucene::store::IndexOutput> create(io::FileWriter* file_writer);
249254
};
250255

251256
/**

be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -692,9 +692,8 @@ class IndexStorageFormatV2MockCreateOutputStream : public IndexStorageFormatV2 {
692692
IndexStorageFormatV2MockCreateOutputStream(IndexFileWriter* index_file_writer)
693693
: IndexStorageFormatV2(index_file_writer) {}
694694

695-
MOCK_METHOD((std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
696-
std::unique_ptr<lucene::store::IndexOutput>>),
697-
create_output_stream, (), (override));
695+
MOCK_METHOD((std::unique_ptr<lucene::store::IndexOutput>), create_output_stream, (),
696+
(override));
698697
};
699698

700699
class IndexFileWriterMockCreateOutputStreamV1 : public IndexFileWriter {
@@ -808,12 +807,9 @@ TEST_F(IndexFileWriterTest, WriteV2OutputTest) {
808807
EXPECT_CALL(
809808
*(IndexStorageFormatV2MockCreateOutputStream*)writer_mock._index_storage_format.get(),
810809
create_output_stream())
811-
.WillOnce(::testing::Invoke(
812-
[&]() -> std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
813-
std::unique_ptr<lucene::store::IndexOutput>> {
814-
return std::make_pair(std::move(out_dir_ptr),
815-
std::move(compound_file_output));
816-
}));
810+
.WillOnce(::testing::Invoke([&]() -> std::unique_ptr<lucene::store::IndexOutput> {
811+
return std::move(compound_file_output);
812+
}));
817813

818814
int64_t index_id = 1;
819815
std::string index_suffix = "suffix1";
@@ -871,12 +867,9 @@ TEST_F(IndexFileWriterTest, WriteV2OutputCloseErrorTest) {
871867
EXPECT_CALL(
872868
*(IndexStorageFormatV2MockCreateOutputStream*)writer_mock._index_storage_format.get(),
873869
create_output_stream())
874-
.WillOnce(::testing::Invoke(
875-
[&]() -> std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
876-
std::unique_ptr<lucene::store::IndexOutput>> {
877-
return std::make_pair(std::move(out_dir_ptr),
878-
std::move(compound_file_output));
879-
}));
870+
.WillOnce(::testing::Invoke([&]() -> std::unique_ptr<lucene::store::IndexOutput> {
871+
return std::move(compound_file_output);
872+
}));
880873

881874
int64_t index_id = 1;
882875
std::string index_suffix = "suffix1";

0 commit comments

Comments
 (0)