Skip to content

Commit 9441816

Browse files
committed
fix
1 parent c819c43 commit 9441816

28 files changed

+408
-249
lines changed

be/src/olap/olap_common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,8 @@ struct OlapReaderStatistics {
448448
int64_t variant_subtree_leaf_iter_count = 0;
449449
int64_t variant_subtree_hierarchical_iter_count = 0;
450450
int64_t variant_subtree_sparse_iter_count = 0;
451+
int64_t variant_subtree_doc_snapshot_extract_iter_count = 0;
452+
int64_t variant_subtree_doc_snapshot_all_iter_count = 0;
451453
};
452454

453455
using ColumnId = uint32_t;

be/src/olap/rowset/segment_creator.cpp

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,6 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
6666
return Status::OK();
6767
}
6868
vectorized::Block flush_block(*block);
69-
if (_context.write_type != DataWriteType::TYPE_COMPACTION &&
70-
_context.tablet_schema->num_variant_columns() > 0) {
71-
RETURN_IF_ERROR(_parse_variant_columns(flush_block));
72-
}
7369
bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
7470
if (config::enable_vertical_segment_writer) {
7571
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
@@ -85,59 +81,6 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
8581
return Status::OK();
8682
}
8783

88-
Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block) {
89-
size_t num_rows = block.rows();
90-
if (num_rows == 0) {
91-
return Status::OK();
92-
}
93-
94-
std::vector<int> variant_column_pos;
95-
for (int i = 0; i < block.columns(); ++i) {
96-
const auto& entry = block.get_by_position(i);
97-
if (entry.type->get_primitive_type() == TYPE_VARIANT) {
98-
variant_column_pos.push_back(i);
99-
}
100-
}
101-
102-
if (variant_column_pos.empty()) {
103-
return Status::OK();
104-
}
105-
106-
std::vector<vectorized::ParseConfig> configs(variant_column_pos.size());
107-
for (size_t i = 0; i < variant_column_pos.size(); ++i) {
108-
configs[i].enable_flatten_nested = _context.tablet_schema->variant_flatten_nested();
109-
const auto& column = _context.tablet_schema->column(variant_column_pos[i]);
110-
if (column.is_variant_type()) {
111-
// enable doc snapshot mode
112-
if (column.variant_enable_doc_snapshot_mode()) {
113-
// if has schema template, no need to parse to doc snapshot, when writing data, we will parse to doc snapshot
114-
if (column.get_sub_columns().empty()) {
115-
configs[i].parse_to_doc_snapshot = true;
116-
} else {
117-
configs[i].parse_to_subcolumns = false;
118-
}
119-
120-
// if min rows is greater than 0, no need to parse to subcolumns
121-
// when compaction row size is greater than min rows, parse to subcolumns
122-
if (column.variant_doc_snapshot_min_rows() > 0) {
123-
configs[i].parse_to_subcolumns = false;
124-
} else {
125-
configs[i].parse_to_subcolumns = true;
126-
}
127-
} else {
128-
// default: only parse to subcolumns
129-
configs[i].parse_to_subcolumns = true;
130-
configs[i].parse_to_doc_snapshot = false;
131-
}
132-
} else {
133-
return Status::InternalError("column is not variant type, column name: {}", column.name());
134-
}
135-
}
136-
RETURN_IF_ERROR(
137-
vectorized::schema_util::parse_variant_columns(block, variant_column_pos, configs));
138-
return Status::OK();
139-
}
140-
14184
Status SegmentFlusher::close() {
14285
return _seg_files.close();
14386
}

be/src/olap/rowset/segment_creator.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,6 @@ class SegmentFlusher {
139139
Status create_writer(std::unique_ptr<SegmentFlusher::Writer>& writer, uint32_t segment_id);
140140

141141
private:
142-
// This method will catch exception when allocate memory failed
143-
Status _parse_variant_columns(vectorized::Block& block) {
144-
RETURN_IF_CATCH_EXCEPTION({ return _internal_parse_variant_columns(block); });
145-
}
146-
Status _internal_parse_variant_columns(vectorized::Block& block);
147142
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
148143
const vectorized::Block* block, size_t row_offset, size_t row_num);
149144
Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,

be/src/olap/rowset/segment_v2/segment_writer.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#include "olap/rowset/segment_v2/page_pointer.h"
5454
#include "olap/rowset/segment_v2/variant/variant_ext_meta_writer.h"
5555
#include "olap/rowset/segment_v2/variant_stats_calculator.h"
56+
#include "olap/rowset/segment_v2/variant/variant_util.h"
5657
#include "olap/segment_loader.h"
5758
#include "olap/short_key_index.h"
5859
#include "olap/storage_engine.h"
@@ -529,6 +530,11 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
529530
for (auto i : including_cids) {
530531
full_block.replace_by_position(i, block->get_by_position(input_id++).column);
531532
}
533+
534+
if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
535+
_tablet_schema->num_variant_columns() > 0) {
536+
RETURN_IF_ERROR(variant_util::parse_variant_columns(full_block, *_tablet_schema, including_cids));
537+
}
532538
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
533539
&full_block, row_pos, num_rows, including_cids));
534540

@@ -703,6 +709,11 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
703709
_serialize_block_to_row_column(*block);
704710
}
705711

712+
if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
713+
_tablet_schema->num_variant_columns() > 0) {
714+
RETURN_IF_ERROR(variant_util::parse_variant_columns(const_cast<vectorized::Block&>(*block), *_tablet_schema, _column_ids));
715+
}
716+
706717
_olap_data_convertor->set_source_content(block, row_pos, num_rows);
707718

708719
// find all row pos for short key indexes

be/src/olap/rowset/segment_v2/segment_writer.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ struct SegmentWriterOptions {
7676
RowsetWriterContext* rowset_ctx = nullptr;
7777
DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
7878
std::shared_ptr<MowContext> mow_ctx;
79-
80-
bool
8179
};
8280

8381
using TabletSharedPtr = std::shared_ptr<Tablet>;

be/src/olap/rowset/segment_v2/variant/hierarchical_data_iterator.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -391,15 +391,15 @@ Status HierarchicalDataIterator::_process_sparse_column(
391391
// Case 1: subcolumn already created, append this row's value into it.
392392
if (auto it = subcolumns_from_sparse_column.find(sub_path);
393393
it != subcolumns_from_sparse_column.end()) {
394-
it->second.deserialize_from_sparse_column(&src_sparse_data_values,
394+
it->second.deserialize_from_binary_column(&src_sparse_data_values,
395395
lower_bound_index);
396396
}
397397
// Case 2: subcolumn not created yet and we still have quota → create it and insert.
398398
else if (subcolumns_from_sparse_column.size() < count) {
399399
// Initialize subcolumn with current logical row index i to align sizes.
400400
ColumnVariant::Subcolumn subcolumn(/*size*/ i, /*is_nullable*/ true,
401401
false);
402-
subcolumn.deserialize_from_sparse_column(&src_sparse_data_values,
402+
subcolumn.deserialize_from_binary_column(&src_sparse_data_values,
403403
lower_bound_index);
404404
subcolumns_from_sparse_column.emplace(sub_path, std::move(subcolumn));
405405
}
@@ -424,7 +424,7 @@ Status HierarchicalDataIterator::_process_sparse_column(
424424
// return Status::InternalError("Failed to add subcolumn for sparse column");
425425
// }
426426
}
427-
container_variant.get_subcolumn({})->deserialize_from_sparse_column(
427+
container_variant.get_subcolumn({})->deserialize_from_binary_column(
428428
&src_sparse_data_values, lower_bound_index);
429429
}
430430
}

be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,9 @@ Status VariantColumnReader::_create_iterator_from_plan(
832832
}
833833
*iterator = std::make_unique<VariantDocSnapshotPathIterator>(
834834
std::move(doc_snapshot_column_caches), plan.relative_path.get_path());
835+
if (opt && opt->stats) {
836+
opt->stats->variant_subtree_doc_snapshot_extract_iter_count++;
837+
}
835838
return Status::OK();
836839
}
837840
case ReadKind::DOC_SNAPSHOT_ALL: {
@@ -849,6 +852,9 @@ Status VariantColumnReader::_create_iterator_from_plan(
849852
std::make_unique<FileColumnIterator>(_root_column_reader),
850853
plan.root->data.file_column_type);
851854
*iterator = std::make_unique<VariantDocSnapshotRootIterator>(std::move(doc_snapshot_column_caches), std::move(root_column_reader));
855+
if (opt && opt->stats) {
856+
opt->stats->variant_subtree_doc_snapshot_all_iter_count++;
857+
}
852858
return Status::OK();
853859
}
854860
default: {

be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,11 +1086,11 @@ Status VariantCompactionDocSnapshotWriter::finalize() {
10861086
if (it->second.size() != i) {
10871087
it->second.insert_many_defaults(i - it->second.size());
10881088
}
1089-
it->second.deserialize_from_sparse_column(column_value, j);
1089+
it->second.deserialize_from_binary_column(column_value, j);
10901090
} else {
10911091
vectorized::ColumnVariant::Subcolumn subcolumn {0, true, false};
10921092
subcolumn.insert_many_defaults(i);
1093-
subcolumn.deserialize_from_sparse_column(column_value, j);
1093+
subcolumn.deserialize_from_binary_column(column_value, j);
10941094
subcolumns[std::string_view(key.data, key.size)] = std::move(subcolumn);
10951095
}
10961096
}

be/src/olap/rowset/segment_v2/variant/variant_doc_snapshot_iterator.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ Status VariantDocSnapshotPathIterator::_merge_doc_snapshot_into_variant(
280280
}
281281
if (path.size() == prefix_ref.size) {
282282
has_data = true;
283-
container_variant.get_subcolumn({})->deserialize_from_sparse_column(
283+
container_variant.get_subcolumn({})->deserialize_from_binary_column(
284284
src_doc_snapshot_data_values_buckets[bucket], lower_bound_index);
285285
continue;
286286
}
@@ -290,11 +290,11 @@ Status VariantDocSnapshotPathIterator::_merge_doc_snapshot_into_variant(
290290
has_data = true;
291291

292292
if (auto it = subcolumn_map.find(path); it != subcolumn_map.end()) {
293-
it->second.deserialize_from_sparse_column(
293+
it->second.deserialize_from_binary_column(
294294
src_doc_snapshot_data_values_buckets[bucket], lower_bound_index);
295295
} else if (subcolumn_map.size() < count) {
296296
ColumnVariant::Subcolumn subcolumn(/*size*/ i, /*is_nullable*/ true, false);
297-
subcolumn.deserialize_from_sparse_column(
297+
subcolumn.deserialize_from_binary_column(
298298
src_doc_snapshot_data_values_buckets[bucket], lower_bound_index);
299299
subcolumn_map[path] = std::move(subcolumn);
300300
} else {

0 commit comments

Comments
 (0)