Skip to content

Commit 3e2ed4a

Browse files
authored
[opt](cloud) optimize load performance for inverted index when pack small files (#59011)
Related PR: #57770 Problem Summary: When merging small files with inverted indexes, the segment close operation was synchronously waiting for inverted index files to be uploaded to S3. This blocking behavior significantly impacted the memtable flush thread performance, causing bottlenecks in the data loading pipeline. Solution: The solution introduces a two-phase close mechanism for inverted index file writers: 1. **Asynchronous Close Phase**: During segment close, inverted index files are closed asynchronously and the S3 upload task is submitted immediately without waiting for completion. 2. **Wait Phase**: When the load channel closes, the system waits for all pending S3 upload tasks to complete, ensuring data consistency.
1 parent c6585e3 commit 3e2ed4a

22 files changed

+234
-77
lines changed

be/src/index-tools/index_tool.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,11 +621,16 @@ int main(int argc, char** argv) {
621621
_CLLDELETE(analyzer);
622622
_CLLDELETE(char_string_reader);
623623

624-
auto ret = index_file_writer->close();
624+
auto ret = index_file_writer->begin_close();
625625
if (!ret.ok()) {
626626
std::cerr << "IndexFileWriter close error:" << ret.msg() << std::endl;
627627
return -1;
628628
}
629+
ret = index_file_writer->finish_close();
630+
if (!ret.ok()) {
631+
std::cerr << "IndexFileWriter wait close error:" << ret.msg() << std::endl;
632+
return -1;
633+
}
629634
} else if (FLAGS_operation == "show_nested_files_v2") {
630635
if (FLAGS_idx_file_path == "") {
631636
std::cout << "no file flag for show " << std::endl;

be/src/olap/rowset/beta_rowset_writer.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,16 +206,24 @@ Status InvertedIndexFileCollection::add(int seg_id, IndexFileWriterPtr&& index_w
206206
return Status::OK();
207207
}
208208

209-
Status InvertedIndexFileCollection::close() {
209+
Status InvertedIndexFileCollection::begin_close() {
210210
std::lock_guard lock(_lock);
211211
for (auto&& [id, writer] : _inverted_index_file_writers) {
212-
RETURN_IF_ERROR(writer->close());
212+
RETURN_IF_ERROR(writer->begin_close());
213213
_total_size += writer->get_index_file_total_size();
214214
}
215215

216216
return Status::OK();
217217
}
218218

219+
Status InvertedIndexFileCollection::finish_close() {
220+
std::lock_guard lock(_lock);
221+
for (auto&& [id, writer] : _inverted_index_file_writers) {
222+
RETURN_IF_ERROR(writer->finish_close());
223+
}
224+
return Status::OK();
225+
}
226+
219227
Result<std::vector<const InvertedIndexFileInfo*>>
220228
InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) {
221229
std::lock_guard lock(_lock);
@@ -1097,7 +1105,8 @@ Status BetaRowsetWriter::create_segment_writer_for_segcompaction(
10971105
_segcompaction_worker->get_file_writer().reset(file_writer.release());
10981106
if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer();
10991107
idx_file_writer != nullptr) {
1100-
RETURN_IF_ERROR(idx_file_writer->close());
1108+
RETURN_IF_ERROR(idx_file_writer->begin_close());
1109+
RETURN_IF_ERROR(idx_file_writer->finish_close());
11011110
}
11021111
_segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release());
11031112
return Status::OK();

be/src/olap/rowset/beta_rowset_writer.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,10 @@ class InvertedIndexFileCollection {
9191

9292
// Close all file writers
9393
// If the inverted index file writer is not closed, an error will be thrown during destruction
94-
Status close();
94+
Status begin_close();
95+
96+
// Wait for all inverted index file writers to be closed
97+
Status finish_close();
9598

9699
// Get inverted index file info in segment id order.
97100
// `seg_id_offset` is the offset of the segment id relative to the subscript of `_inverted_index_file_writers`,
@@ -214,9 +217,11 @@ class BaseBetaRowsetWriter : public RowsetWriter {
214217
// Some index files are written during normal compaction and some files are written during index compaction.
215218
// After all index writes are completed, call this method to write the final compound index file.
216219
Status _close_inverted_index_file_writers() {
217-
RETURN_NOT_OK_STATUS_WITH_WARN(_idx_files.close(),
220+
RETURN_NOT_OK_STATUS_WITH_WARN(_idx_files.begin_close(),
218221
"failed to close index file when build new rowset");
219222
this->_total_index_size += _idx_files.get_total_index_size();
223+
RETURN_NOT_OK_STATUS_WITH_WARN(_idx_files.finish_close(),
224+
"failed to wait close index file when build new rowset");
220225
return Status::OK();
221226
}
222227

be/src/olap/rowset/segment_creator.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block)
111111
}
112112

113113
Status SegmentFlusher::close() {
114-
return _seg_files.close();
114+
RETURN_IF_ERROR(_seg_files.close());
115+
RETURN_IF_ERROR(_idx_files.finish_close());
116+
return Status::OK();
115117
}
116118

117119
Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,

be/src/olap/rowset/segment_v2/index_file_writer.cpp

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,15 +184,15 @@ Result<std::unique_ptr<IndexSearcherBuilder>> IndexFileWriter::_construct_index_
184184
return IndexSearcherBuilder::create_index_searcher_builder(reader_type);
185185
}
186186

187-
Status IndexFileWriter::close() {
187+
Status IndexFileWriter::begin_close() {
188188
DCHECK(!_closed) << debug_string();
189189
_closed = true;
190190
if (_indices_dirs.empty()) {
191191
// An empty file must still be created even if there are no indexes to write
192192
if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr ||
193193
dynamic_cast<io::S3FileWriter*>(_idx_v2_writer.get()) != nullptr ||
194194
dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
195-
return _idx_v2_writer->close();
195+
return _idx_v2_writer->close(true);
196196
}
197197
return Status::OK();
198198
}
@@ -224,7 +224,24 @@ Status IndexFileWriter::close() {
224224
err.what());
225225
}
226226
}
227-
LOG_INFO("IndexFileWriter closing, enable_write_index_searcher_cache: {}",
227+
return Status::OK();
228+
}
229+
230+
Status IndexFileWriter::finish_close() {
231+
DCHECK(_closed) << debug_string();
232+
if (_indices_dirs.empty()) {
233+
// An empty file must still be created even if there are no indexes to write
234+
if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr ||
235+
dynamic_cast<io::S3FileWriter*>(_idx_v2_writer.get()) != nullptr ||
236+
dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
237+
return _idx_v2_writer->close(false);
238+
}
239+
return Status::OK();
240+
}
241+
if (_idx_v2_writer != nullptr && _idx_v2_writer->state() != io::FileWriter::State::CLOSED) {
242+
RETURN_IF_ERROR(_idx_v2_writer->close(false));
243+
}
244+
LOG_INFO("IndexFileWriter finish_close, enable_write_index_searcher_cache: {}",
228245
config::enable_write_index_searcher_cache);
229246
Status st = Status::OK();
230247
if (config::enable_write_index_searcher_cache) {

be/src/olap/rowset/segment_v2/index_file_writer.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,13 @@ class IndexFileWriter {
5757
Status delete_index(const TabletIndex* index_meta);
5858
Status initialize(InvertedIndexDirectoryMap& indices_dirs);
5959
Status add_into_searcher_cache();
60-
Status close();
60+
// Begin the close process. This mainly triggers the asynchronous close operation of
61+
// _idx_v2_writer by calling close(true), which starts the close process but returns
62+
// immediately without waiting for completion.
63+
Status begin_close();
64+
// Finish the close process. This waits for the close operation to complete by calling
65+
// _idx_v2_writer->close(false), which blocks until the close is fully done.
66+
Status finish_close();
6167
const InvertedIndexFileInfo* get_index_file_info() const {
6268
DCHECK(_closed) << debug_string();
6369
return &_file_info;

be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ void DorisFSDirectory::FSIndexOutputV2::close() {
433433
_index_v2_file_writer = nullptr;
434434
})
435435
if (_index_v2_file_writer) {
436-
auto ret = _index_v2_file_writer->close();
436+
auto ret = _index_v2_file_writer->close(true);
437437
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error",
438438
{ ret = Status::Error<INTERNAL_ERROR>("writer close status error"); })
439439
if (!ret.ok()) {

be/src/olap/rowset/segment_v2/segment_writer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ class SegmentWriter {
148148
*inverted_index_file_size = 0;
149149
return Status::OK();
150150
}
151-
RETURN_IF_ERROR(_index_file_writer->close());
151+
RETURN_IF_ERROR(_index_file_writer->begin_close());
152152
*inverted_index_file_size = _index_file_writer->get_index_file_total_size();
153153
return Status::OK();
154154
}

be/src/olap/rowset/segment_v2/vertical_segment_writer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class VerticalSegmentWriter {
125125
*inverted_index_file_size = 0;
126126
return Status::OK();
127127
}
128-
RETURN_IF_ERROR(_index_file_writer->close());
128+
RETURN_IF_ERROR(_index_file_writer->begin_close());
129129
*inverted_index_file_size = _index_file_writer->get_index_file_total_size();
130130
return Status::OK();
131131
}

be/src/olap/task/index_builder.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,13 +368,20 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
368368
_index_file_writers.emplace(seg_ptr->id(), std::move(index_file_writer));
369369
}
370370
for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
371-
auto st = index_file_writer->close();
371+
auto st = index_file_writer->begin_close();
372372
if (!st.ok()) {
373373
LOG(ERROR) << "close index_file_writer error:" << st;
374374
return st;
375375
}
376376
inverted_index_size += index_file_writer->get_index_file_total_size();
377377
}
378+
for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
379+
auto st = index_file_writer->finish_close();
380+
if (!st.ok()) {
381+
LOG(ERROR) << "wait close index_file_writer error:" << st;
382+
return st;
383+
}
384+
}
378385
_index_file_writers.clear();
379386
output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size());
380387
output_rowset_meta->set_total_disk_size(output_rowset_meta->total_disk_size() +
@@ -610,7 +617,7 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
610617
_olap_data_convertor->reset();
611618
}
612619
for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
613-
auto st = index_file_writer->close();
620+
auto st = index_file_writer->begin_close();
614621
DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_file_writer_close_error", {
615622
st = Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
616623
"debug point: handle_single_rowset_file_writer_close_error");
@@ -621,6 +628,13 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
621628
}
622629
inverted_index_size += index_file_writer->get_index_file_total_size();
623630
}
631+
for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
632+
auto st = index_file_writer->finish_close();
633+
if (!st.ok()) {
634+
LOG(ERROR) << "wait close index_file_writer error:" << st;
635+
return st;
636+
}
637+
}
624638
_index_column_writers.clear();
625639
_index_file_writers.clear();
626640
output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size());

0 commit comments

Comments
 (0)