diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp index e65a43e7a5cc6a..84ba00d2b91f3c 100644 --- a/be/src/cloud/pb_convert.cpp +++ b/be/src/cloud/pb_convert.cpp @@ -80,6 +80,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in) out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated()); + out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows()); out->mutable_segments_file_size()->CopyFrom(in.segments_file_size()); out->set_index_id(in.index_id()); if (in.has_schema_version()) { @@ -151,6 +152,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) { out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated()); + out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows()); out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size()); out->set_index_id(in.index_id()); if (in.has_schema_version()) { @@ -234,6 +236,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in) out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated()); + out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows()); out->mutable_segments_file_size()->CopyFrom(in.segments_file_size()); out->set_index_id(in.index_id()); if (in.has_schema_version()) { @@ -305,6 +308,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) { out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated()); + out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows()); out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size()); out->set_index_id(in.index_id()); if (in.has_schema_version()) { diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5f78c7d9294a30..3bf58be9bc2a12 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -384,6 +384,12 @@ DEFINE_mInt32(trash_file_expire_time_sec, "0"); // modify them upon necessity DEFINE_Int32(min_file_descriptor_number, "60000"); DEFINE_mBool(disable_segment_cache, "false"); +// Enable checking segment rows consistency between rowset meta and segment footer +DEFINE_mBool(enable_segment_rows_consistency_check, "false"); +DEFINE_mBool(enable_segment_rows_check_core, "false"); +// ATTENTION: For test only. In test environment, there are no historical data, +// so all rowset meta should have segment rows info. +DEFINE_mBool(fail_when_segment_rows_not_in_rowset_meta, "false"); DEFINE_String(row_cache_mem_limit, "20%"); // Cache for storage page size diff --git a/be/src/common/config.h b/be/src/common/config.h index c9d9fe94ffbdca..8c54e6063983ce 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -432,6 +432,12 @@ DECLARE_mInt32(trash_file_expire_time_sec); // modify them upon necessity DECLARE_Int32(min_file_descriptor_number); DECLARE_mBool(disable_segment_cache); +// Enable checking segment rows consistency between rowset meta and segment footer +DECLARE_mBool(enable_segment_rows_consistency_check); +DECLARE_mBool(enable_segment_rows_check_core); +// ATTENTION: For test only. In test environment, there are no historical data, +// so all rowset meta should have segment rows info. +DECLARE_mBool(fail_when_segment_rows_not_in_rowset_meta); DECLARE_String(row_cache_mem_limit); // Cache for storage page size diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 4e9297983a5185..1e75d5bea3c1aa 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -340,6 +340,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() { auto seg_id = 0; bool segments_key_bounds_truncated {false}; std::vector segment_key_bounds; + std::vector num_segment_rows; for (auto rowset : _input_rowsets) { RETURN_IF_ERROR(rowset->link_files_to(tablet()->tablet_path(), _output_rs_writer->rowset_id(), seg_id)); @@ -348,6 +349,10 @@ Status CompactionMixin::do_compact_ordered_rowsets() { std::vector key_bounds; RETURN_IF_ERROR(rowset->get_segments_key_bounds(&key_bounds)); segment_key_bounds.insert(segment_key_bounds.end(), key_bounds.begin(), key_bounds.end()); + std::vector input_segment_rows; + rowset->get_num_segment_rows(&input_segment_rows); + num_segment_rows.insert(num_segment_rows.end(), input_segment_rows.begin(), + input_segment_rows.end()); } // build output rowset RowsetMetaSharedPtr rowset_meta = std::make_shared(); @@ -361,6 +366,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() { rowset_meta->set_rowset_state(VISIBLE); rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated); rowset_meta->set_segments_key_bounds(segment_key_bounds); + rowset_meta->set_num_segment_rows(num_segment_rows); _output_rowset = _output_rs_writer->manual_build(rowset_meta); diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index ec11c42aa7e965..50a764cc72a11c 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -222,6 +222,9 @@ Status ParallelScannerBuilder::_build_scanners_by_per_segment(std::listquery_options().__isset.enable_segment_cache + ? _state->query_options().enable_segment_cache + : true; for (auto&& [tablet, version] : _tablets) { const auto tablet_id = tablet->tablet_id(); _all_read_sources[tablet_id] = _read_sources[idx]; @@ -233,7 +236,8 @@ Status ParallelScannerBuilder::_load() { auto beta_rowset = std::dynamic_pointer_cast(rowset); std::vector segment_rows; - RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, &_builder_stats)); + RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, enable_segment_cache, + &_builder_stats)); auto segment_count = rowset->num_segments(); for (int64_t i = 0; i != segment_count; i++) { _all_segments_rows[rowset_id].emplace_back(segment_rows[i]); diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index e6b1beb7208fa3..35fbf1dff1e46e 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -29,9 +29,11 @@ #include #include "beta_rowset.h" +#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" +#include "cpp/sync_point.h" #include "io/fs/file_reader.h" #include "io/fs/file_system.h" #include "io/fs/local_file_system.h" @@ -71,24 +73,97 @@ Status BetaRowset::init() { return Status::OK(); // no op } +namespace { +Status load_segment_rows_from_footer(BetaRowsetSharedPtr rowset, + std::vector* segment_rows, bool enable_segment_cache, + OlapReaderStatistics* read_stats) { + SegmentCacheHandle segment_cache_handle; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( + rowset, &segment_cache_handle, enable_segment_cache, false, read_stats)); + for (const auto& segment : segment_cache_handle.get_segments()) { + segment_rows->emplace_back(segment->num_rows()); + } + return Status::OK(); +} + +Status check_segment_rows_consistency(const std::vector& rows_from_meta, + const std::vector& rows_from_footer, + int64_t tablet_id, const std::string& rowset_id) { + DCHECK_EQ(rows_from_footer.size(), rows_from_meta.size()); + for (size_t i = 0; i < rows_from_footer.size(); i++) { + if (rows_from_footer[i] != rows_from_meta[i]) { + auto msg = fmt::format( + "segment rows mismatch between rowset meta and segment footer. " + "segment index: {}, meta rows: {}, footer rows: {}, tablet={}, rowset={}", + i, rows_from_meta[i], rows_from_footer[i], tablet_id, rowset_id); + if (config::enable_segment_rows_check_core) { + CHECK(false) << msg; + } + return Status::InternalError(msg); + } + } + return Status::OK(); +} +} // namespace + Status BetaRowset::get_segment_num_rows(std::vector* segment_rows, + bool enable_segment_cache, OlapReaderStatistics* read_stats) { +#ifndef BE_TEST // `ROWSET_UNLOADING` is state for closed() called but owned by some readers. // So here `ROWSET_UNLOADING` is allowed. DCHECK_NE(_rowset_state_machine.rowset_state(), ROWSET_UNLOADED); - - RETURN_IF_ERROR(_load_segment_rows_once.call([this, read_stats] { +#endif + RETURN_IF_ERROR(_load_segment_rows_once.call([this, enable_segment_cache, read_stats] { auto segment_count = num_segments(); - _segments_rows.resize(segment_count); - for (int64_t i = 0; i != segment_count; ++i) { - SegmentCacheHandle segment_cache_handle; - RETURN_IF_ERROR(SegmentLoader::instance()->load_segment( - std::static_pointer_cast(shared_from_this()), i, - &segment_cache_handle, false, false, read_stats)); - const auto& tmp_segments = segment_cache_handle.get_segments(); - _segments_rows[i] = tmp_segments[0]->num_rows(); + if (segment_count == 0) { + return Status::OK(); } - return Status::OK(); + + if (!_rowset_meta->get_num_segment_rows().empty()) { + if (_rowset_meta->get_num_segment_rows().size() == segment_count) { + // use segment rows in rowset meta if eligible + TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta"); + _segments_rows.assign(_rowset_meta->get_num_segment_rows().cbegin(), + _rowset_meta->get_num_segment_rows().cend()); + if (config::enable_segment_rows_consistency_check) { + // verify segment rows from meta match segment footer + std::vector rows_from_footer; + auto self = std::dynamic_pointer_cast(shared_from_this()); + auto load_status = load_segment_rows_from_footer( + self, &rows_from_footer, enable_segment_cache, read_stats); + if (load_status.ok()) { + return check_segment_rows_consistency( + _segments_rows, rows_from_footer, _rowset_meta->tablet_id(), + _rowset_meta->rowset_id().to_string()); + } + } + return Status::OK(); + } else { + auto msg = fmt::format( + "[verbose] corrupted segment rows info in rowset meta. " + "segment count: {}, segment rows size: {}, tablet={}, rowset={}", + segment_count, _rowset_meta->get_num_segment_rows().size(), + _rowset_meta->tablet_id(), _rowset_meta->rowset_id().to_string()); + if (config::enable_segment_rows_check_core) { + CHECK(false) << msg; + } + LOG_EVERY_SECOND(WARNING) << msg; + } + } + if (config::fail_when_segment_rows_not_in_rowset_meta) { + CHECK(false) << "[verbose] segment rows info not found in rowset meta. tablet=" + << _rowset_meta->tablet_id() + << ", rowset=" << _rowset_meta->rowset_id().to_string() + << ", version=" << _rowset_meta->version() + << ", debug_string=" << _rowset_meta->debug_string() + << ", stack=" << Status::InternalError("error"); + } + // otherwise, read it from segment footer + TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:load_from_segment_footer"); + auto self = std::dynamic_pointer_cast(shared_from_this()); + return load_segment_rows_from_footer(self, &_segments_rows, enable_segment_cache, + read_stats); })); segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend()); return Status::OK(); diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index f0cd5c966cdb8e..d5fa6ce4677998 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -91,7 +91,7 @@ class BetaRowset final : public Rowset { Status show_nested_index_file(rapidjson::Value* rowset_value, rapidjson::Document::AllocatorType& allocator); - Status get_segment_num_rows(std::vector* segment_rows, + Status get_segment_num_rows(std::vector* segment_rows, bool enable_segment_cache, OlapReaderStatistics* read_stats); protected: diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index b7817ea2b75be0..26b8d360449cd9 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -258,7 +258,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context if (_read_context->record_rowids && _read_context->rowid_conversion) { // init segment rowid map for rowid conversion std::vector segment_rows; - RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows, _stats)); + RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows, should_use_cache, _stats)); RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(), segment_rows)); } diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index fbaaf8b57400db..6a99f1d447bb9b 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -29,8 +29,10 @@ #include #include #include +#include // IWYU pragma: no_include +#include "common/cast_set.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" @@ -97,6 +99,9 @@ void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta, std::vector segments_key_bounds; spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds); rowset_meta.set_segments_key_bounds(segments_key_bounds); + std::vector num_segment_rows; + spec_rowset_meta.get_num_segment_rows(&num_segment_rows); + rowset_meta.set_num_segment_rows(num_segment_rows); } } // namespace @@ -777,6 +782,7 @@ Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { _num_segment += cast_set(rowset->num_segments()); // append key_bounds to current rowset RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds)); + rowset->get_num_segment_rows(&_segment_num_rows); _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated(); // TODO update zonemap @@ -956,6 +962,7 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch int64_t total_data_size = 0; int64_t total_index_size = 0; std::vector segments_encoded_key_bounds; + std::vector segment_rows; { std::lock_guard lock(_segid_statistics_map_mutex); for (const auto& itr : _segid_statistics_map) { @@ -963,14 +970,23 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch total_data_size += itr.second.data_size; total_index_size += itr.second.index_size; segments_encoded_key_bounds.push_back(itr.second.key_bounds); + // segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load + segment_rows.push_back(cast_set(itr.second.row_num)); } } + if (segment_rows.empty()) { + // vertical compaction and linked schema change will not record segment statistics, + // it will record segment rows in _segment_num_rows + RETURN_IF_ERROR(get_segment_num_rows(&segment_rows)); + } + for (auto& key_bound : _segments_encoded_key_bounds) { segments_encoded_key_bounds.push_back(key_bound); } if (_segments_key_bounds_truncated.has_value()) { rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value()); } + rowset_meta->set_num_segment_rows(segment_rows); // segment key bounds are empty in old version(before version 1.2.x). So we should not modify // the overlap property when key bounds are empty. // for mow table with cluster keys, the overlap is used for cluster keys, @@ -991,6 +1007,13 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch "is: {}, _num_seg is: {}", segments_encoded_key_bounds_size, segment_num); } + if (segment_rows.size() != segment_num) { + return Status::InternalError( + "segment_rows size should equal to _num_seg, segment_rows size is: {}, " + "_num_seg is {}, tablet={}, rowset={}, txn={}", + segment_rows.size(), segment_num, _context.tablet_id, + _context.rowset_id.to_string(), _context.txn_id); + } } rowset_meta->set_num_segments(segment_num); diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 9ab9ca3356b5f7..2d2a6267ff8079 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -272,6 +272,10 @@ class Rowset : public std::enable_shared_from_this, public MetadataAdder return Status::OK(); } + void get_num_segment_rows(std::vector* num_segment_rows) { + _rowset_meta->get_num_segment_rows(num_segment_rows); + } + // min key of the first segment bool first_key(std::string* min_key) { KeyBoundsPB key_bounds; diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index 63408a8452e838..632304ae5f3cae 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -26,6 +26,7 @@ #include "cloud/cloud_storage_engine.h" #include "common/logging.h" #include "common/status.h" +#include "cpp/sync_point.h" #include "google/protobuf/util/message_differencer.h" #include "io/fs/encrypted_fs_factory.h" #include "io/fs/file_system.h" @@ -325,6 +326,20 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) { set_total_disk_size(data_disk_size() + index_disk_size()); set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() || other.is_segments_key_bounds_truncated()); + if (_rowset_meta_pb.num_segment_rows_size() > 0) { + if (other.num_segments() > 0) { + if (other._rowset_meta_pb.num_segment_rows_size() > 0) { + for (auto row_count : other._rowset_meta_pb.num_segment_rows()) { + _rowset_meta_pb.add_num_segment_rows(row_count); + } + } else { + // This may happen when a partial update load commits in high version doirs_be + // and publishes with new segments in low version doris_be. In this case, just clear + // all num_segment_rows. + _rowset_meta_pb.clear_num_segment_rows(); + } + } + } for (auto&& key_bound : other.get_segments_key_bounds()) { add_segment_key_bounds(key_bound); } @@ -343,6 +358,7 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) { } // In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated // Otherwise the schema is stale and lead to wrong data read + TEST_SYNC_POINT_RETURN_WITH_VOID("RowsetMeta::merge_rowset_meta:skip_schema_merge"); if (tablet_schema()->num_variant_columns() > 0) { // merge extracted columns TabletSchemaSPtr merged_schema; diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index c6f81f4b6f86e3..df2d51d7edaa5f 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -147,6 +147,18 @@ class RowsetMeta : public MetadataAdder { void set_num_rows(int64_t num_rows) { _rowset_meta_pb.set_num_rows(num_rows); } + void set_num_segment_rows(const std::vector& num_segment_rows) { + _rowset_meta_pb.mutable_num_segment_rows()->Assign(num_segment_rows.cbegin(), + num_segment_rows.cend()); + } + + void get_num_segment_rows(std::vector* num_segment_rows) const { + num_segment_rows->assign(_rowset_meta_pb.num_segment_rows().cbegin(), + _rowset_meta_pb.num_segment_rows().cend()); + } + + auto& get_num_segment_rows() const { return _rowset_meta_pb.num_segment_rows(); } + int64_t total_disk_size() const { return _rowset_meta_pb.total_disk_size(); } void set_total_disk_size(int64_t total_disk_size) { @@ -437,6 +449,8 @@ class RowsetMeta : public MetadataAdder { int32_t schema_version() const { return _rowset_meta_pb.schema_version(); } + std::string debug_string() const { return _rowset_meta_pb.ShortDebugString(); } + private: bool _deserialize_from_pb(std::string_view value); diff --git a/be/src/olap/task/index_builder.cpp b/be/src/olap/task/index_builder.cpp index a3ee9171fcf11b..8dab082cdef91f 100644 --- a/be/src/olap/task/index_builder.cpp +++ b/be/src/olap/task/index_builder.cpp @@ -307,6 +307,9 @@ Status IndexBuilder::update_inverted_index_info() { rowset_meta->set_segments_key_bounds_truncated( input_rowset_meta->is_segments_key_bounds_truncated()); rowset_meta->set_segments_key_bounds(key_bounds); + std::vector num_segment_rows; + input_rowset_meta->get_num_segment_rows(&num_segment_rows); + rowset_meta->set_num_segment_rows(num_segment_rows); auto output_rowset = output_rs_writer->manual_build(rowset_meta); if (input_rowset_meta->has_delete_predicate()) { output_rowset->rowset_meta()->set_delete_predicate( diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index 6390bdcbb714e3..d659a19e955037 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -384,7 +384,7 @@ class TestRowIdConversion : public testing::TestWithParam(out_rowset); std::vector segment_num_rows; OlapReaderStatistics statistics; - EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, &statistics).ok()); + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, false, &statistics).ok()); if (has_delete_handler) { // All keys less than 1000 are deleted by delete handler for (auto& item : output_data) { diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp index 730999e650adc0..3885620566f3f3 100644 --- a/be/test/olap/rowset/beta_rowset_test.cpp +++ b/be/test/olap/rowset/beta_rowset_test.cpp @@ -39,6 +39,7 @@ #include "common/config.h" #include "common/status.h" +#include "cpp/sync_point.h" #include "gen_cpp/olap_file.pb.h" #include "gtest/gtest_pred_impl.h" #include "io/fs/file_system.h" @@ -413,4 +414,167 @@ TEST_F(BetaRowsetTest, GetIndexFileNames) { } } +TEST_F(BetaRowsetTest, GetSegmentNumRowsFromMeta) { + // Test getting segment rows from rowset meta (new version data) + // This test verifies that when segment_rows is present in rowset meta, + // it uses the cached data directly without loading segments + auto tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + auto rowset_meta = std::make_shared(); + init_rs_meta(rowset_meta, 1, 1); + rowset_meta->set_num_segments(3); + + // Set segment rows in rowset meta (simulating new version data) + std::vector expected_segment_rows = {100, 200, 300}; + rowset_meta->set_num_segment_rows(expected_segment_rows); + + auto rowset = std::make_shared(tablet_schema, rowset_meta, ""); + + // Use sync point to verify code path + auto sp = SyncPoint::get_instance(); + bool used_meta_path = false; + bool used_footer_path = false; + + sp->set_call_back("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta", + [&](auto&& args) { used_meta_path = true; }); + + sp->set_call_back("BetaRowset::get_segment_num_rows:load_from_segment_footer", + [&](auto&& args) { used_footer_path = true; }); + + sp->enable_processing(); + + std::vector segment_rows; + Status st = rowset->get_segment_num_rows(&segment_rows, false, &_stats); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(segment_rows.size(), 3); + ASSERT_EQ(segment_rows[0], 100); + ASSERT_EQ(segment_rows[1], 200); + ASSERT_EQ(segment_rows[2], 300); + + // Verify that we used the meta path and not the footer path + ASSERT_TRUE(used_meta_path); + ASSERT_FALSE(used_footer_path); + + // Test calling get_segment_num_rows twice to verify cache works + used_meta_path = false; + used_footer_path = false; + std::vector segment_rows_2; + st = rowset->get_segment_num_rows(&segment_rows_2, false, &_stats); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(segment_rows_2.size(), 3); + ASSERT_EQ(segment_rows_2[0], 100); + ASSERT_EQ(segment_rows_2[1], 200); + ASSERT_EQ(segment_rows_2[2], 300); + + EXPECT_FALSE(used_meta_path); + EXPECT_FALSE(used_footer_path); + + sp->clear_all_call_backs(); + sp->disable_processing(); + sp->clear_trace(); +} + +TEST_F(BetaRowsetTest, GetSegmentNumRowsEmptyMeta) { + // Test when rowset meta has no segment rows (old version data) + // In this case, it should try to load segments from segment footer + auto tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + auto rowset_meta = std::make_shared(); + init_rs_meta(rowset_meta, 1, 1); + rowset_meta->set_num_segments(2); + // segment_rows is empty (simulating old version data) + + auto rowset = std::make_shared(tablet_schema, rowset_meta, ""); + + // Use sync point to verify code path + auto sp = SyncPoint::get_instance(); + bool used_meta_path = false; + bool used_footer_path = false; + + sp->set_call_back("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta", + [&](auto&& args) { used_meta_path = true; }); + + sp->set_call_back("BetaRowset::get_segment_num_rows:load_from_segment_footer", + [&](auto&& args) { used_footer_path = true; }); + + sp->enable_processing(); + + std::vector segment_rows; + Status st = rowset->get_segment_num_rows(&segment_rows, false, &_stats); + + // Since we don't have actual segment files, it will fail to load segments + // But the important thing is to verify it tried to load from footer + ASSERT_TRUE(used_footer_path); + ASSERT_FALSE(used_meta_path); + + sp->clear_all_call_backs(); + sp->disable_processing(); + sp->clear_trace(); +} + +TEST_F(BetaRowsetTest, GetSegmentNumRowsCorruptedMeta) { + // Test when segment_rows size doesn't match segment count + // This simulates a corrupted rowset meta + auto tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + auto rowset_meta = std::make_shared(); + init_rs_meta(rowset_meta, 1, 1); + rowset_meta->set_num_segments(3); + + // Set segment rows with wrong size (should be 3 but only has 2) + std::vector wrong_segment_rows = {100, 200}; + rowset_meta->set_num_segment_rows(wrong_segment_rows); + + auto rowset = std::make_shared(tablet_schema, rowset_meta, ""); + + // Use sync point to verify code path + auto sp = SyncPoint::get_instance(); + bool used_meta_path = false; + bool used_footer_path = false; + + sp->set_call_back("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta", + [&](auto&& args) { used_meta_path = true; }); + + sp->set_call_back("BetaRowset::get_segment_num_rows:load_from_segment_footer", + [&](auto&& args) { used_footer_path = true; }); + + sp->enable_processing(); + + std::vector segment_rows; + Status st = rowset->get_segment_num_rows(&segment_rows, false, &_stats); + + // When segment_rows size doesn't match, it should fall back to loading from footer + ASSERT_FALSE(used_meta_path); + ASSERT_TRUE(used_footer_path); + + sp->clear_all_call_backs(); + sp->disable_processing(); + sp->clear_trace(); +} + +TEST_F(BetaRowsetTest, GetNumSegmentRowsAPI) { + // Test the simple get_num_segment_rows API (without loading) + auto tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + auto rowset_meta = std::make_shared(); + init_rs_meta(rowset_meta, 1, 1); + rowset_meta->set_num_segments(3); + + std::vector expected_segment_rows = {100, 200, 300}; + rowset_meta->set_num_segment_rows(expected_segment_rows); + + auto rowset = std::make_shared(tablet_schema, rowset_meta, ""); + + std::vector segment_rows; + rowset->get_num_segment_rows(&segment_rows); + ASSERT_EQ(segment_rows.size(), 3); + ASSERT_EQ(segment_rows[0], 100); + ASSERT_EQ(segment_rows[1], 200); + ASSERT_EQ(segment_rows[2], 300); +} + } // namespace doris diff --git a/be/test/olap/rowset/rowset_meta_test.cpp b/be/test/olap/rowset/rowset_meta_test.cpp index cb1b2865c1440e..c78b5803f03a82 100644 --- a/be/test/olap/rowset/rowset_meta_test.cpp +++ b/be/test/olap/rowset/rowset_meta_test.cpp @@ -29,6 +29,7 @@ #include #include "common/status.h" +#include "cpp/sync_point.h" #include "gtest/gtest_pred_impl.h" #include "olap/olap_common.h" #include "olap/olap_meta.h" @@ -123,4 +124,183 @@ TEST_F(RowsetMetaTest, TestRowsetIdInit) { EXPECT_EQ(id.to_string(), "72057594037927935"); } +TEST_F(RowsetMetaTest, TestNumSegmentRowsSetAndGet) { + RowsetMeta rowset_meta; + EXPECT_TRUE(rowset_meta.init_from_json(_json_rowset_meta)); + + // Test set_num_segment_rows and get_num_segment_rows + std::vector num_segment_rows = {100, 200, 300}; + rowset_meta.set_num_segment_rows(num_segment_rows); + + std::vector retrieved_rows; + rowset_meta.get_num_segment_rows(&retrieved_rows); + + EXPECT_EQ(retrieved_rows.size(), 3); + EXPECT_EQ(retrieved_rows[0], 100); + EXPECT_EQ(retrieved_rows[1], 200); + EXPECT_EQ(retrieved_rows[2], 300); + + // Test get_num_segment_rows() const reference + const auto& num_segment_rows_ref = rowset_meta.get_num_segment_rows(); + EXPECT_EQ(num_segment_rows_ref.size(), 3); + EXPECT_EQ(num_segment_rows_ref.Get(0), 100); + EXPECT_EQ(num_segment_rows_ref.Get(1), 200); + EXPECT_EQ(num_segment_rows_ref.Get(2), 300); + + // Test serialization and deserialization + RowsetMetaPB rowset_meta_pb; + rowset_meta.to_rowset_pb(&rowset_meta_pb); + EXPECT_EQ(rowset_meta_pb.num_segment_rows_size(), 3); + EXPECT_EQ(rowset_meta_pb.num_segment_rows(0), 100); + EXPECT_EQ(rowset_meta_pb.num_segment_rows(1), 200); + EXPECT_EQ(rowset_meta_pb.num_segment_rows(2), 300); + + RowsetMeta rowset_meta_2; + rowset_meta_2.init_from_pb(rowset_meta_pb); + std::vector retrieved_rows_2; + rowset_meta_2.get_num_segment_rows(&retrieved_rows_2); + EXPECT_EQ(retrieved_rows_2.size(), 3); + EXPECT_EQ(retrieved_rows_2[0], 100); + EXPECT_EQ(retrieved_rows_2[1], 200); + EXPECT_EQ(retrieved_rows_2[2], 300); +} + +TEST_F(RowsetMetaTest, TestNumSegmentRowsEmpty) { + RowsetMeta rowset_meta; + EXPECT_TRUE(rowset_meta.init_from_json(_json_rowset_meta)); + + // By default, num_segment_rows should be empty + std::vector retrieved_rows; + rowset_meta.get_num_segment_rows(&retrieved_rows); + EXPECT_EQ(retrieved_rows.size(), 0); + + const auto& num_segment_rows_ref = rowset_meta.get_num_segment_rows(); + EXPECT_EQ(num_segment_rows_ref.size(), 0); +} + +TEST_F(RowsetMetaTest, TestMergeRowsetMetaWithNumSegmentRows) { + RowsetMeta rowset_meta_1; + EXPECT_TRUE(rowset_meta_1.init_from_json(_json_rowset_meta)); + std::vector num_segment_rows_1 = {100, 200}; + rowset_meta_1.set_num_segment_rows(num_segment_rows_1); + rowset_meta_1.set_num_segments(2); + rowset_meta_1.set_total_disk_size(1000); + rowset_meta_1.set_data_disk_size(800); + rowset_meta_1.set_index_disk_size(200); + + RowsetMeta rowset_meta_2; + EXPECT_TRUE(rowset_meta_2.init_from_json(_json_rowset_meta)); + std::vector num_segment_rows_2 = {300, 400, 500}; + rowset_meta_2.set_num_segment_rows(num_segment_rows_2); + rowset_meta_2.set_num_segments(3); + rowset_meta_2.set_total_disk_size(2000); + rowset_meta_2.set_data_disk_size(1600); + rowset_meta_2.set_index_disk_size(400); + + // Use sync point to skip schema merge logic + auto sp = SyncPoint::get_instance(); + bool skip_called = false; + sp->set_call_back("RowsetMeta::merge_rowset_meta:skip_schema_merge", [&](auto&& args) { + skip_called = true; + // Set the return flag to skip the schema merge logic + auto pred = try_any_cast(args.back()); + *pred = true; + }); + sp->enable_processing(); + + // Merge rowset_meta_2 into rowset_meta_1 + rowset_meta_1.merge_rowset_meta(rowset_meta_2); + + EXPECT_TRUE(skip_called); + + sp->clear_all_call_backs(); + sp->disable_processing(); + sp->clear_trace(); + + // Check merged num_segment_rows + std::vector merged_rows; + rowset_meta_1.get_num_segment_rows(&merged_rows); + EXPECT_EQ(merged_rows.size(), 5); + EXPECT_EQ(merged_rows[0], 100); + EXPECT_EQ(merged_rows[1], 200); + EXPECT_EQ(merged_rows[2], 300); + EXPECT_EQ(merged_rows[3], 400); + EXPECT_EQ(merged_rows[4], 500); + + // Check merged num_segments + EXPECT_EQ(rowset_meta_1.num_segments(), 5); + + // Check merged disk sizes + EXPECT_EQ(rowset_meta_1.total_disk_size(), 3000); +} + +TEST_F(RowsetMetaTest, TestMergeRowsetMetaWithPartialNumSegmentRows) { + RowsetMeta rowset_meta_1; + EXPECT_TRUE(rowset_meta_1.init_from_json(_json_rowset_meta)); + std::vector num_segment_rows_1 = {100, 200}; + rowset_meta_1.set_num_segment_rows(num_segment_rows_1); + rowset_meta_1.set_num_segments(2); + + RowsetMeta rowset_meta_2; + EXPECT_TRUE(rowset_meta_2.init_from_json(_json_rowset_meta)); + // rowset_meta_2 has no num_segment_rows (simulating old version data) + rowset_meta_2.set_num_segments(3); + + // Use sync point to skip schema merge logic + auto sp = SyncPoint::get_instance(); + sp->set_call_back("RowsetMeta::merge_rowset_meta:skip_schema_merge", [&](auto&& args) { + auto pred = try_any_cast(args.back()); + *pred = true; + }); + sp->enable_processing(); + + // Merge rowset_meta_2 into rowset_meta_1 + rowset_meta_1.merge_rowset_meta(rowset_meta_2); + + sp->clear_all_call_backs(); + sp->disable_processing(); + sp->clear_trace(); + + // num_segment_rows should be cleared when one of them is empty + std::vector merged_rows; + rowset_meta_1.get_num_segment_rows(&merged_rows); + EXPECT_EQ(merged_rows.size(), 0); + + // num_segments should still be merged + EXPECT_EQ(rowset_meta_1.num_segments(), 5); +} + +TEST_F(RowsetMetaTest, TestMergeRowsetMetaBothEmpty) { + RowsetMeta rowset_meta_1; + EXPECT_TRUE(rowset_meta_1.init_from_json(_json_rowset_meta)); + rowset_meta_1.set_num_segments(2); + + RowsetMeta rowset_meta_2; + EXPECT_TRUE(rowset_meta_2.init_from_json(_json_rowset_meta)); + rowset_meta_2.set_num_segments(3); + + // Use sync point to skip schema merge logic + auto sp = SyncPoint::get_instance(); + sp->set_call_back("RowsetMeta::merge_rowset_meta:skip_schema_merge", [&](auto&& args) { + auto pred = try_any_cast(args.back()); + *pred = true; + }); + sp->enable_processing(); + + // Merge rowset_meta_2 into rowset_meta_1 + rowset_meta_1.merge_rowset_meta(rowset_meta_2); + + sp->clear_all_call_backs(); + sp->disable_processing(); + sp->clear_trace(); + + // num_segment_rows should remain empty + std::vector merged_rows; + rowset_meta_1.get_num_segment_rows(&merged_rows); + EXPECT_EQ(merged_rows.size(), 0); + + // num_segments should still be merged + EXPECT_EQ(rowset_meta_1.num_segments(), 5); +} + } // namespace doris diff --git a/be/test/olap/segcompaction_mow_test.cpp b/be/test/olap/segcompaction_mow_test.cpp index d1e2b44ce4df73..92e20da4efb95c 100644 --- a/be/test/olap/segcompaction_mow_test.cpp +++ b/be/test/olap/segcompaction_mow_test.cpp @@ -287,7 +287,7 @@ class SegCompactionMoWTest : public ::testing::TestWithParam { auto beta_rowset = std::dynamic_pointer_cast(rowset); std::vector segment_num_rows; OlapReaderStatistics stats; - EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, &stats).ok()); + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, false, &stats).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { total_num_rows += i; diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp index c599725c18df78..51487ef236c83c 100644 --- a/be/test/olap/segcompaction_test.cpp +++ b/be/test/olap/segcompaction_test.cpp @@ -393,7 +393,7 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) { auto beta_rowset = std::dynamic_pointer_cast(rowset); std::vector segment_num_rows; OlapReaderStatistics stats; - EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, &stats).ok()); + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, false, &stats).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { total_num_rows += i; @@ -903,7 +903,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { auto beta_rowset = std::dynamic_pointer_cast(rowset); std::vector segment_num_rows; OlapReaderStatistics stats; - EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, &stats).ok()); + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, false, &stats).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { total_num_rows += i; @@ -1172,7 +1172,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { auto beta_rowset = std::dynamic_pointer_cast(rowset); std::vector segment_num_rows; OlapReaderStatistics stats; - EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, &stats).ok()); + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, false, &stats).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { total_num_rows += i; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index cac152a6a55d8c..03e59c5f7ee78b 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -137,6 +137,9 @@ message RowsetMetaPB { // indicate that whether the segments key bounds is truncated optional bool segments_key_bounds_truncated = 55; + // rows count for each segment + repeated int64 num_segment_rows = 56; + // For cloud // for data recycling optional int64 txn_expiration = 1000; @@ -235,6 +238,9 @@ message RowsetMetaCloudPB { // indicate that whether the segments key bounds is truncated optional bool segments_key_bounds_truncated = 55; + // rows count for each segment + repeated int64 num_segment_rows = 56; + // cloud // the field is a vector, rename it repeated int64 segments_file_size = 100; diff --git a/regression-test/data/fault_injection_p0/test_ordered_compaction_num_seg_rows.out b/regression-test/data/fault_injection_p0/test_ordered_compaction_num_seg_rows.out new file mode 100644 index 00000000000000..f08573569b1aa6 --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_ordered_compaction_num_seg_rows.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +10 10 +12 12 +14 14 +20 20 +21 21 +22 22 +23 23 +24 24 +30 30 +31 31 + +-- !sql -- +10 10 +12 12 +14 14 +20 20 +21 21 +22 22 +23 23 +24 24 +30 30 +31 31 + diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf b/regression-test/pipeline/cloud_p0/conf/be_custom.conf index fa41a88af35c11..210e709e99fa0f 100644 --- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf +++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf @@ -52,3 +52,7 @@ enable_prefill_all_dbm_agg_cache_after_compaction=true enable_batch_get_delete_bitmap=true get_delete_bitmap_bytes_threshold=10 + +enable_segment_rows_consistency_check=true +enable_segment_rows_check_core=true +fail_when_segment_rows_not_in_rowset_meta=true diff --git a/regression-test/pipeline/cloud_p1/conf/be_custom.conf b/regression-test/pipeline/cloud_p1/conf/be_custom.conf index aed4d69efbf704..749be9f2d092e5 100644 --- a/regression-test/pipeline/cloud_p1/conf/be_custom.conf +++ b/regression-test/pipeline/cloud_p1/conf/be_custom.conf @@ -37,3 +37,6 @@ enable_table_size_correctness_check=true enable_write_index_searcher_cache=true large_cumu_compaction_task_min_thread_num=3 enable_prefill_all_dbm_agg_cache_after_compaction=true + +enable_segment_rows_consistency_check=true +enable_segment_rows_check_core=true diff --git a/regression-test/pipeline/nonConcurrent/conf/be.conf b/regression-test/pipeline/nonConcurrent/conf/be.conf index d8e4bbb6e35091..afa9475eef2295 100644 --- a/regression-test/pipeline/nonConcurrent/conf/be.conf +++ b/regression-test/pipeline/nonConcurrent/conf/be.conf @@ -89,4 +89,5 @@ large_cumu_compaction_task_min_thread_num=3 enable_parquet_page_index=true enable_graceful_exit_check=true - +enable_segment_rows_consistency_check=true +enable_segment_rows_check_core=true diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index 01df493f002f37..633667d0a9fdeb 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -92,4 +92,6 @@ enable_graceful_exit_check=true enable_prefill_all_dbm_agg_cache_after_compaction=true enable_fetch_rowsets_from_peer_replicas = true - +enable_segment_rows_consistency_check=true +enable_segment_rows_check_core=true +fail_when_segment_rows_not_in_rowset_meta=true diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy b/regression-test/pipeline/p0/conf/regression-conf.groovy index e35ec1847c08ef..701b26c8e4fb7f 100644 --- a/regression-test/pipeline/p0/conf/regression-conf.groovy +++ b/regression-test/pipeline/p0/conf/regression-conf.groovy @@ -79,6 +79,7 @@ excludeDirectories = "000_the_start_sentinel_do_not_touch," + // keep this line "external_table_p0/remote_doris," + // ubsan issue, need to investigate "workload_manager_p1," + "plsql_p0," + // plsql is not developped any more, add by sk + "restore_p0," + "variant_p0/nested," + "variant_p0/nested/sql," + "zzz_the_end_sentinel_do_not_touch"// keep this line as the last line diff --git a/regression-test/pipeline/p1/conf/be.conf b/regression-test/pipeline/p1/conf/be.conf index 4d02df140fbe98..b0fa16b867fe7f 100644 --- a/regression-test/pipeline/p1/conf/be.conf +++ b/regression-test/pipeline/p1/conf/be.conf @@ -77,4 +77,5 @@ enable_graceful_exit_check=true enable_prefill_all_dbm_agg_cache_after_compaction=true - +enable_segment_rows_consistency_check=true +enable_segment_rows_check_core=true diff --git a/regression-test/suites/fault_injection_p0/test_ordered_compaction_num_seg_rows.groovy b/regression-test/suites/fault_injection_p0/test_ordered_compaction_num_seg_rows.groovy new file mode 100644 index 00000000000000..0fb13db1f060e3 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_ordered_compaction_num_seg_rows.groovy @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_ordered_compaction_num_seg_rows","nonConcurrent") { + if (isCloudMode()) { + return + } + + def custoBeConfig = [ + ordered_data_compaction_min_segment_size : 1, + enable_ordered_data_compaction: true + ] + setBeConfigTemporary(custoBeConfig) { + + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code1, out1, err1) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code1 + ", out=" + out1 + ", err=" + err1) + assert code1 == 0 + + + def tableName = "test_ordered_compaction_num_seg_rows" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + duplicate KEY(k) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true") + """ + + sql """ INSERT INTO ${tableName} VALUES (10,10),(12,12),(14,14)""" + sql """ INSERT INTO ${tableName} VALUES (20,20),(21,21),(22,22),(23,23),(24,24)""" + sql """ INSERT INTO ${tableName} VALUES (30,30),(31,31)""" + qt_sql "select * from ${tableName} order by k;" + + def check_rs_metas = { tbl, check_func -> + def compactionUrl = sql_return_maparray("show tablets from ${tbl};").get(0).MetaUrl + def (code, out, err) = curl("GET", compactionUrl) + assert code == 0 + def jsonMeta = parseJson(out.trim()) + logger.info("==== tablet_meta.rs_metas: ${jsonMeta.rs_metas}") + check_func(jsonMeta.rs_metas) + } + + def tabletStats = sql_return_maparray("show tablets from ${tableName};") + def tabletId = tabletStats[0].TabletId + def tabletBackendId = tabletStats[0].BackendId + def tabletBackend + def backends = sql_return_maparray('show backends') + for (def be : backends) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("==== tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + + def do_cumu_compaction = { def tbl, def tablet_id, int start, int end -> + GetDebugPoint().enableDebugPointForAllBEs("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", [tablet_id: "${tablet_id}", start_version: "${start}", end_version: "${end}"]) + trigger_and_wait_compaction(tbl, "cumulative") + GetDebugPoint().disableDebugPointForAllBEs("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets") + } + + try { + // [2-2],[3-3],[4-4] -> [2,4] + do_cumu_compaction(tableName, tabletId, 2, 4) + qt_sql "select * from ${tableName} order by k;" + + check_rs_metas(tableName, {def rowsets -> + assert rowsets.size() == 2 + def num_segment_rows = rowsets[1].num_segment_rows + logger.info("==== num_segment_rows: ${num_segment_rows}") + assert num_segment_rows.size() == 3 + assert num_segment_rows[0] == 3 + assert num_segment_rows[1] == 5 + assert num_segment_rows[2] == 2 + }) + + } catch (Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } + } +}