Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down Expand Up @@ -151,6 +152,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down Expand Up @@ -234,6 +236,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down Expand Up @@ -305,6 +308,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,12 @@ DEFINE_mInt32(trash_file_expire_time_sec, "0");
// modify them upon necessity
DEFINE_Int32(min_file_descriptor_number, "60000");
DEFINE_mBool(disable_segment_cache, "false");
// Enable checking segment rows consistency between rowset meta and segment footer
DEFINE_mBool(enable_segment_rows_consistency_check, "false");
DEFINE_mBool(enable_segment_rows_check_core, "false");
// ATTENTION: For test only. In test environment, there are no historical data,
// so all rowset meta should have segment rows info.
DEFINE_mBool(fail_when_segment_rows_not_in_rowset_meta, "false");
DEFINE_String(row_cache_mem_limit, "20%");

// Cache for storage page size
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,12 @@ DECLARE_mInt32(trash_file_expire_time_sec);
// modify them upon necessity
DECLARE_Int32(min_file_descriptor_number);
DECLARE_mBool(disable_segment_cache);
// Enable checking segment rows consistency between rowset meta and segment footer
DECLARE_mBool(enable_segment_rows_consistency_check);
DECLARE_mBool(enable_segment_rows_check_core);
// ATTENTION: For test only. In test environment, there are no historical data,
// so all rowset meta should have segment rows info.
DECLARE_mBool(fail_when_segment_rows_not_in_rowset_meta);
DECLARE_String(row_cache_mem_limit);

// Cache for storage page size
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
auto seg_id = 0;
bool segments_key_bounds_truncated {false};
std::vector<KeyBoundsPB> segment_key_bounds;
std::vector<uint32_t> num_segment_rows;
for (auto rowset : _input_rowsets) {
RETURN_IF_ERROR(rowset->link_files_to(tablet()->tablet_path(),
_output_rs_writer->rowset_id(), seg_id));
Expand All @@ -348,6 +349,10 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
std::vector<KeyBoundsPB> key_bounds;
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&key_bounds));
segment_key_bounds.insert(segment_key_bounds.end(), key_bounds.begin(), key_bounds.end());
std::vector<uint32_t> input_segment_rows;
rowset->get_num_segment_rows(&input_segment_rows);
num_segment_rows.insert(num_segment_rows.end(), input_segment_rows.begin(),
input_segment_rows.end());
}
// build output rowset
RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>();
Expand All @@ -361,6 +366,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
rowset_meta->set_rowset_state(VISIBLE);
rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated);
rowset_meta->set_segments_key_bounds(segment_key_bounds);
rowset_meta->set_num_segment_rows(num_segment_rows);

_output_rowset = _output_rs_writer->manual_build(rowset_meta);

Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ Status ParallelScannerBuilder::_build_scanners_by_per_segment(std::list<ScannerS
Status ParallelScannerBuilder::_load() {
_total_rows = 0;
size_t idx = 0;
bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache
? _state->query_options().enable_segment_cache
: true;
for (auto&& [tablet, version] : _tablets) {
const auto tablet_id = tablet->tablet_id();
_all_read_sources[tablet_id] = _read_sources[idx];
Expand All @@ -233,7 +236,8 @@ Status ParallelScannerBuilder::_load() {

auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
std::vector<uint32_t> segment_rows;
RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, &_builder_stats));
RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, enable_segment_cache,
&_builder_stats));
auto segment_count = rowset->num_segments();
for (int64_t i = 0; i != segment_count; i++) {
_all_segments_rows[rowset_id].emplace_back(segment_rows[i]);
Expand Down
97 changes: 86 additions & 11 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
#include <utility>

#include "beta_rowset.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
Expand Down Expand Up @@ -71,24 +73,97 @@ Status BetaRowset::init() {
return Status::OK(); // no op
}

namespace {
Status load_segment_rows_from_footer(BetaRowsetSharedPtr rowset,
std::vector<uint32_t>* segment_rows, bool enable_segment_cache,
OlapReaderStatistics* read_stats) {
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
rowset, &segment_cache_handle, enable_segment_cache, false, read_stats));
for (const auto& segment : segment_cache_handle.get_segments()) {
segment_rows->emplace_back(segment->num_rows());
}
return Status::OK();
}

Status check_segment_rows_consistency(const std::vector<uint32_t>& rows_from_meta,
const std::vector<uint32_t>& rows_from_footer,
int64_t tablet_id, const std::string& rowset_id) {
DCHECK_EQ(rows_from_footer.size(), rows_from_meta.size());
for (size_t i = 0; i < rows_from_footer.size(); i++) {
if (rows_from_footer[i] != rows_from_meta[i]) {
auto msg = fmt::format(
"segment rows mismatch between rowset meta and segment footer. "
"segment index: {}, meta rows: {}, footer rows: {}, tablet={}, rowset={}",
i, rows_from_meta[i], rows_from_footer[i], tablet_id, rowset_id);
if (config::enable_segment_rows_check_core) {
CHECK(false) << msg;
}
return Status::InternalError(msg);
}
}
return Status::OK();
}
} // namespace

Status BetaRowset::get_segment_num_rows(std::vector<uint32_t>* segment_rows,
bool enable_segment_cache,
OlapReaderStatistics* read_stats) {
#ifndef BE_TEST
// `ROWSET_UNLOADING` is state for closed() called but owned by some readers.
// So here `ROWSET_UNLOADING` is allowed.
DCHECK_NE(_rowset_state_machine.rowset_state(), ROWSET_UNLOADED);

RETURN_IF_ERROR(_load_segment_rows_once.call([this, read_stats] {
#endif
RETURN_IF_ERROR(_load_segment_rows_once.call([this, enable_segment_cache, read_stats] {
auto segment_count = num_segments();
_segments_rows.resize(segment_count);
for (int64_t i = 0; i != segment_count; ++i) {
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
std::static_pointer_cast<BetaRowset>(shared_from_this()), i,
&segment_cache_handle, false, false, read_stats));
const auto& tmp_segments = segment_cache_handle.get_segments();
_segments_rows[i] = tmp_segments[0]->num_rows();
if (segment_count == 0) {
return Status::OK();
}
return Status::OK();

if (!_rowset_meta->get_num_segment_rows().empty()) {
if (_rowset_meta->get_num_segment_rows().size() == segment_count) {
// use segment rows in rowset meta if eligible
TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta");
_segments_rows.assign(_rowset_meta->get_num_segment_rows().cbegin(),
_rowset_meta->get_num_segment_rows().cend());
if (config::enable_segment_rows_consistency_check) {
// verify segment rows from meta match segment footer
std::vector<uint32_t> rows_from_footer;
auto self = std::dynamic_pointer_cast<BetaRowset>(shared_from_this());
auto load_status = load_segment_rows_from_footer(
self, &rows_from_footer, enable_segment_cache, read_stats);
if (load_status.ok()) {
return check_segment_rows_consistency(
_segments_rows, rows_from_footer, _rowset_meta->tablet_id(),
_rowset_meta->rowset_id().to_string());
}
}
return Status::OK();
} else {
auto msg = fmt::format(
"[verbose] corrupted segment rows info in rowset meta. "
"segment count: {}, segment rows size: {}, tablet={}, rowset={}",
segment_count, _rowset_meta->get_num_segment_rows().size(),
_rowset_meta->tablet_id(), _rowset_meta->rowset_id().to_string());
if (config::enable_segment_rows_check_core) {
CHECK(false) << msg;
}
LOG_EVERY_SECOND(WARNING) << msg;
}
}
if (config::fail_when_segment_rows_not_in_rowset_meta) {
CHECK(false) << "[verbose] segment rows info not found in rowset meta. tablet="
<< _rowset_meta->tablet_id()
<< ", rowset=" << _rowset_meta->rowset_id().to_string()
<< ", version=" << _rowset_meta->version()
<< ", debug_string=" << _rowset_meta->debug_string()
<< ", stack=" << Status::InternalError("error");
}
// otherwise, read it from segment footer
TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:load_from_segment_footer");
auto self = std::dynamic_pointer_cast<BetaRowset>(shared_from_this());
return load_segment_rows_from_footer(self, &_segments_rows, enable_segment_cache,
read_stats);
}));
segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend());
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class BetaRowset final : public Rowset {
Status show_nested_index_file(rapidjson::Value* rowset_value,
rapidjson::Document::AllocatorType& allocator);

Status get_segment_num_rows(std::vector<uint32_t>* segment_rows,
Status get_segment_num_rows(std::vector<uint32_t>* segment_rows, bool enable_segment_cache,
OlapReaderStatistics* read_stats);

protected:
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
if (_read_context->record_rowids && _read_context->rowid_conversion) {
// init segment rowid map for rowid conversion
std::vector<uint32_t> segment_rows;
RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows, _stats));
RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows, should_use_cache, _stats));
RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
segment_rows));
}
Expand Down
23 changes: 23 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
#include <mutex>
#include <sstream>
#include <utility>
#include <vector>

// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/cast_set.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
Expand Down Expand Up @@ -97,6 +99,9 @@ void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta,
std::vector<KeyBoundsPB> segments_key_bounds;
spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
rowset_meta.set_segments_key_bounds(segments_key_bounds);
std::vector<uint32_t> num_segment_rows;
spec_rowset_meta.get_num_segment_rows(&num_segment_rows);
rowset_meta.set_num_segment_rows(num_segment_rows);
}

} // namespace
Expand Down Expand Up @@ -777,6 +782,7 @@ Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
_num_segment += cast_set<int32_t>(rowset->num_segments());
// append key_bounds to current rowset
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));
rowset->get_num_segment_rows(&_segment_num_rows);
_segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated();

// TODO update zonemap
Expand Down Expand Up @@ -956,21 +962,31 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch
int64_t total_data_size = 0;
int64_t total_index_size = 0;
std::vector<KeyBoundsPB> segments_encoded_key_bounds;
std::vector<uint32_t> segment_rows;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
for (const auto& itr : _segid_statistics_map) {
num_rows_written += itr.second.row_num;
total_data_size += itr.second.data_size;
total_index_size += itr.second.index_size;
segments_encoded_key_bounds.push_back(itr.second.key_bounds);
// segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load
segment_rows.push_back(cast_set<uint32_t>(itr.second.row_num));
}
}
if (segment_rows.empty()) {
// vertical compaction and linked schema change will not record segment statistics,
// it will record segment rows in _segment_num_rows
RETURN_IF_ERROR(get_segment_num_rows(&segment_rows));
}

for (auto& key_bound : _segments_encoded_key_bounds) {
segments_encoded_key_bounds.push_back(key_bound);
}
if (_segments_key_bounds_truncated.has_value()) {
rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value());
}
rowset_meta->set_num_segment_rows(segment_rows);
// segment key bounds are empty in old version(before version 1.2.x). So we should not modify
// the overlap property when key bounds are empty.
// for mow table with cluster keys, the overlap is used for cluster keys,
Expand All @@ -991,6 +1007,13 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch
"is: {}, _num_seg is: {}",
segments_encoded_key_bounds_size, segment_num);
}
if (segment_rows.size() != segment_num) {
return Status::InternalError(
"segment_rows size should equal to _num_seg, segment_rows size is: {}, "
"_num_seg is {}, tablet={}, rowset={}, txn={}",
segment_rows.size(), segment_num, _context.tablet_id,
_context.rowset_id.to_string(), _context.txn_id);
}
}

rowset_meta->set_num_segments(segment_num);
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
return Status::OK();
}

void get_num_segment_rows(std::vector<uint32_t>* num_segment_rows) {
_rowset_meta->get_num_segment_rows(num_segment_rows);
}

// min key of the first segment
bool first_key(std::string* min_key) {
KeyBoundsPB key_bounds;
Expand Down
16 changes: 16 additions & 0 deletions be/src/olap/rowset/rowset_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "cloud/cloud_storage_engine.h"
#include "common/logging.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "google/protobuf/util/message_differencer.h"
#include "io/fs/encrypted_fs_factory.h"
#include "io/fs/file_system.h"
Expand Down Expand Up @@ -325,6 +326,20 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
set_total_disk_size(data_disk_size() + index_disk_size());
set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() ||
other.is_segments_key_bounds_truncated());
if (_rowset_meta_pb.num_segment_rows_size() > 0) {
if (other.num_segments() > 0) {
if (other._rowset_meta_pb.num_segment_rows_size() > 0) {
for (auto row_count : other._rowset_meta_pb.num_segment_rows()) {
_rowset_meta_pb.add_num_segment_rows(row_count);
}
} else {
// This may happen when a partial update load commits in high version doirs_be
// and publishes with new segments in low version doris_be. In this case, just clear
// all num_segment_rows.
_rowset_meta_pb.clear_num_segment_rows();
}
}
}
for (auto&& key_bound : other.get_segments_key_bounds()) {
add_segment_key_bounds(key_bound);
}
Expand All @@ -343,6 +358,7 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
}
// In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated
// Otherwise the schema is stale and lead to wrong data read
TEST_SYNC_POINT_RETURN_WITH_VOID("RowsetMeta::merge_rowset_meta:skip_schema_merge");
if (tablet_schema()->num_variant_columns() > 0) {
// merge extracted columns
TabletSchemaSPtr merged_schema;
Expand Down
Loading
Loading