Skip to content

Commit e445bd7

Browse files
committed
fix
1 parent 9441816 commit e445bd7

File tree

58 files changed

+967
-442
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+967
-442
lines changed

be/src/olap/rowset/segment_v2/segment_writer.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@
5252
#include "olap/rowset/segment_v2/page_io.h"
5353
#include "olap/rowset/segment_v2/page_pointer.h"
5454
#include "olap/rowset/segment_v2/variant/variant_ext_meta_writer.h"
55-
#include "olap/rowset/segment_v2/variant_stats_calculator.h"
5655
#include "olap/rowset/segment_v2/variant/variant_util.h"
56+
#include "olap/rowset/segment_v2/variant_stats_calculator.h"
5757
#include "olap/segment_loader.h"
5858
#include "olap/short_key_index.h"
5959
#include "olap/storage_engine.h"
@@ -533,7 +533,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
533533

534534
if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
535535
_tablet_schema->num_variant_columns() > 0) {
536-
RETURN_IF_ERROR(variant_util::parse_variant_columns(full_block, *_tablet_schema, including_cids));
536+
RETURN_IF_ERROR(
537+
variant_util::parse_variant_columns(full_block, *_tablet_schema, including_cids));
537538
}
538539
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
539540
&full_block, row_pos, num_rows, including_cids));
@@ -711,7 +712,8 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
711712

712713
if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
713714
_tablet_schema->num_variant_columns() > 0) {
714-
RETURN_IF_ERROR(variant_util::parse_variant_columns(const_cast<vectorized::Block&>(*block), *_tablet_schema, _column_ids));
715+
RETURN_IF_ERROR(variant_util::parse_variant_columns(const_cast<vectorized::Block&>(*block),
716+
*_tablet_schema, _column_ids));
715717
}
716718

717719
_olap_data_convertor->set_source_content(block, row_pos, num_rows);

be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -816,13 +816,14 @@ Status VariantColumnReader::_create_iterator_from_plan(
816816
case ReadKind::DOC_SNAPSHOT: {
817817
DCHECK(plan.doc_snapshot_buckets.size() == 1);
818818
ColumnIteratorUPtr inner_iter;
819-
RETURN_IF_ERROR(_doc_snapshot_column_readers.at(plan.doc_snapshot_buckets[0])->new_iterator(&inner_iter, nullptr));
819+
RETURN_IF_ERROR(_doc_snapshot_column_readers.at(plan.doc_snapshot_buckets[0])
820+
->new_iterator(&inner_iter, nullptr));
820821
*iterator = std::make_unique<VariantDocSnapshotCompactIterator>(std::move(inner_iter));
821822
return Status::OK();
822823
}
823824
case ReadKind::DOC_SNAPSHOT_EXTRACT: {
824825
DCHECK(plan.doc_snapshot_buckets.size() >= 1);
825-
826+
826827
std::vector<BinaryColumnCacheSPtr> doc_snapshot_column_caches;
827828
for (const auto& bucket : plan.doc_snapshot_buckets) {
828829
std::string path = DOC_SNAPSHOT_COLUMN_PATH + "." + std::to_string(bucket);
@@ -848,10 +849,11 @@ Status VariantColumnReader::_create_iterator_from_plan(
848849
std::unique_ptr<SubstreamIterator> root_column_reader;
849850
DCHECK(plan.root);
850851
root_column_reader = std::make_unique<SubstreamIterator>(
851-
plan.root->data.file_column_type->create_column(),
852-
std::make_unique<FileColumnIterator>(_root_column_reader),
853-
plan.root->data.file_column_type);
854-
*iterator = std::make_unique<VariantDocSnapshotRootIterator>(std::move(doc_snapshot_column_caches), std::move(root_column_reader));
852+
plan.root->data.file_column_type->create_column(),
853+
std::make_unique<FileColumnIterator>(_root_column_reader),
854+
plan.root->data.file_column_type);
855+
*iterator = std::make_unique<VariantDocSnapshotRootIterator>(
856+
std::move(doc_snapshot_column_caches), std::move(root_column_reader));
855857
if (opt && opt->stats) {
856858
opt->stats->variant_subtree_doc_snapshot_all_iter_count++;
857859
}

be/src/olap/rowset/segment_v2/variant/variant_column_reader.h

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,7 @@ struct BinaryColumnCache {
193193
return std::equal(_rowids, _rowids + _count, rowids.get());
194194
}
195195

196-
ordinal_t get_current_ordinal() const {
197-
return binary_column_iterator->get_current_ordinal();
198-
}
196+
ordinal_t get_current_ordinal() const { return binary_column_iterator->get_current_ordinal(); }
199197
};
200198

201199
using BinaryColumnCacheSPtr = std::shared_ptr<BinaryColumnCache>;
@@ -298,16 +296,16 @@ class VariantColumnReader : public ColumnReader {
298296
// Describe how a variant sub-path should be read. This is a logical plan only and
299297
// does not create any concrete ColumnIterator.
300298
enum class ReadKind {
301-
ROOT_FLAT, // root variant using `VariantRootColumnIterator`
302-
HIERARCHICAL, // hierarchical merge (root + subcolumns + sparse)
303-
LEAF, // direct leaf reader
304-
SPARSE_EXTRACT, // extract single path from sparse column
305-
SPARSE_MERGE, // merge subcolumns into sparse column
306-
DEFAULT_NESTED, // fill nested subcolumn using sibling nested column
307-
DEFAULT_FILL, // default iterator when path not exist
308-
DOC_SNAPSHOT, // read from doc snapshot column when compaction read
299+
ROOT_FLAT, // root variant using `VariantRootColumnIterator`
300+
HIERARCHICAL, // hierarchical merge (root + subcolumns + sparse)
301+
LEAF, // direct leaf reader
302+
SPARSE_EXTRACT, // extract single path from sparse column
303+
SPARSE_MERGE, // merge subcolumns into sparse column
304+
DEFAULT_NESTED, // fill nested subcolumn using sibling nested column
305+
DEFAULT_FILL, // default iterator when path not exist
306+
DOC_SNAPSHOT, // read from doc snapshot column when compaction read
309307
DOC_SNAPSHOT_EXTRACT, // extract single path or hierarchical from doc snapshot column when read
310-
DOC_SNAPSHOT_ALL, // read all paths from doc snapshot column when read
308+
DOC_SNAPSHOT_ALL, // read all paths from doc snapshot column when read
311309
};
312310

313311
struct ReadPlan {

be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ Status VariantColumnWriterImpl::_process_doc_snapshot_column(
672672
return Status::OK();
673673
}
674674
ptr->reconstruct_doc_snapshot_column();
675-
const int bucket_num = 128;
675+
const int bucket_num = std::max(1, _tablet_column->variant_doc_snapshot_shard_count());
676676
RETURN_IF_ERROR(
677677
_doc_snapshot_writer.init(_tablet_column, bucket_num, column_id, _opts, _opts.footer));
678678
RETURN_IF_ERROR(_doc_snapshot_writer.append_data(_tablet_column, *ptr, num_rows, converter));
@@ -972,9 +972,8 @@ Status VariantSubcolumnWriter::append_nullable(const uint8_t* null_map, const ui
972972
return Status::OK();
973973
}
974974

975-
VariantCompactionDocSnapshotWriter::VariantCompactionDocSnapshotWriter(const ColumnWriterOptions& opts,
976-
const TabletColumn* column,
977-
std::unique_ptr<Field> field)
975+
VariantCompactionDocSnapshotWriter::VariantCompactionDocSnapshotWriter(
976+
const ColumnWriterOptions& opts, const TabletColumn* column, std::unique_ptr<Field> field)
978977
: ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta) {
979978
_opts = opts;
980979
_tablet_column = column;
@@ -1054,13 +1053,13 @@ Status VariantCompactionDocSnapshotWriter::write_bloom_filter_index() {
10541053
RETURN_IF_ERROR(_doc_snapshot_column_writer->write_bloom_filter_index());
10551054
return Status::OK();
10561055
}
1057-
Status VariantCompactionDocSnapshotWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1058-
size_t num_rows) {
1056+
Status VariantCompactionDocSnapshotWriter::append_nullable(const uint8_t* null_map,
1057+
const uint8_t** ptr, size_t num_rows) {
10591058
RETURN_IF_ERROR(append_data(ptr, num_rows));
10601059
return Status::OK();
10611060
}
10621061
Status VariantCompactionDocSnapshotWriter::finalize() {
1063-
auto * variant_column = assert_cast<vectorized::ColumnVariant*>(_column.get());
1062+
auto* variant_column = assert_cast<vectorized::ColumnVariant*>(_column.get());
10641063

10651064
const auto& parent_column =
10661065
_opts.rowset_ctx->tablet_schema->column_by_uid(_tablet_column->parent_unique_id());
@@ -1069,8 +1068,8 @@ Status VariantCompactionDocSnapshotWriter::finalize() {
10691068
auto converter = std::make_unique<vectorized::OlapBlockDataConvertor>();
10701069
int column_id = 0;
10711070
int64_t variant_doc_snapshot_min_rows = parent_column.variant_doc_snapshot_min_rows();
1072-
if (variant_doc_snapshot_min_rows >= 0
1073-
&& num_rows >= static_cast<size_t>(variant_doc_snapshot_min_rows)) {
1071+
if (variant_doc_snapshot_min_rows == 0 ||
1072+
num_rows >= static_cast<size_t>(variant_doc_snapshot_min_rows)) {
10741073
std::unordered_map<std::string_view, vectorized::ColumnVariant::Subcolumn> subcolumns;
10751074

10761075
auto [column_key, column_value] = variant_column->get_doc_snapshot_data_paths_and_values();
@@ -1128,9 +1127,24 @@ Status VariantCompactionDocSnapshotWriter::finalize() {
11281127
continue;
11291128
}
11301129
subcolumn.finalize();
1130+
TabletColumn tablet_column;
1131+
TabletSchema::SubColumnInfo sub_column_info;
1132+
if (vectorized::schema_util::generate_sub_column_info(
1133+
*_opts.rowset_ctx->tablet_schema, parent_column.unique_id(),
1134+
std::string(path), &sub_column_info)) {
1135+
vectorized::schema_util::inherit_column_attributes(parent_column,
1136+
sub_column_info.column);
1137+
tablet_column = std::move(sub_column_info.column);
1138+
_subcolumns_indexes[column_id] = std::move(sub_column_info.indexes);
1139+
} else {
1140+
tablet_column = generate_column_info(path, subcolumn);
1141+
const auto& indexes =
1142+
_opts.rowset_ctx->tablet_schema->inverted_indexs(parent_column.unique_id());
1143+
vectorized::schema_util::inherit_index(indexes, _subcolumns_indexes[column_id],
1144+
tablet_column);
1145+
}
11311146

11321147
int current_column_id = column_id++;
1133-
TabletColumn tablet_column = generate_column_info(path, subcolumn);
11341148
int64_t none_null_value_size = subcolumn.get_non_null_value_size();
11351149
vectorized::ColumnPtr current_column = subcolumn.get_finalized_column_ptr()->get_ptr();
11361150
vectorized::DataTypePtr current_type = subcolumn.get_least_common_type();
@@ -1166,25 +1180,24 @@ Status VariantCompactionDocSnapshotWriter::finalize() {
11661180
TabletColumn doc_snapshot_column =
11671181
vectorized::schema_util::create_doc_snapshot_column(parent_column, bucket_value);
11681182
_init_column_meta(_opts.meta, column_id, doc_snapshot_column, _opts.compression_type);
1169-
RETURN_IF_ERROR(ColumnWriter::create_map_writer(_opts, &doc_snapshot_column,
1170-
_opts.file_writer, &_doc_snapshot_column_writer));
1183+
RETURN_IF_ERROR(ColumnWriter::create_map_writer(_opts, &doc_snapshot_column, _opts.file_writer,
1184+
&_doc_snapshot_column_writer));
11711185
RETURN_IF_ERROR(_doc_snapshot_column_writer->init());
11721186

11731187
// convert root column data from engine format to storage layer format
11741188
converter->add_column_data_convertor(doc_snapshot_column);
11751189
// Convert MutableColumnPtr to ColumnPtr by creating a shared pointer from the raw pointer
11761190
// The ownership is maintained by _column, so this is safe
1177-
RETURN_IF_ERROR(converter->set_source_content_with_specifid_column({variant_column->get_doc_snapshot_column(), nullptr, ""}, 0,
1178-
num_rows, column_id));
1191+
RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
1192+
{variant_column->get_doc_snapshot_column(), nullptr, ""}, 0, num_rows, column_id));
11791193
auto [status, column] = converter->convert_column_data(column_id);
11801194
RETURN_IF_ERROR(status);
1181-
RETURN_IF_ERROR(
1182-
_doc_snapshot_column_writer->append(column->get_nullmap(), column->get_data(), num_rows));
1195+
RETURN_IF_ERROR(_doc_snapshot_column_writer->append(column->get_nullmap(), column->get_data(),
1196+
num_rows));
11831197
converter->clear_source_content(column_id);
11841198

11851199
_opts.meta->set_num_rows(num_rows);
11861200

1187-
11881201
auto [column_key, column_value] = variant_column->get_doc_snapshot_data_paths_and_values();
11891202
const auto& column_offsets = variant_column->serialized_doc_snapshot_column_offsets();
11901203
std::map<StringRef, uint32_t> column_stats;

be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,9 @@ class VariantColumnWriterImpl {
187187

188188
class VariantCompactionDocSnapshotWriter : public ColumnWriter {
189189
public:
190-
explicit VariantCompactionDocSnapshotWriter(const ColumnWriterOptions& opts, const TabletColumn* column,
191-
std::unique_ptr<Field> field);
190+
explicit VariantCompactionDocSnapshotWriter(const ColumnWriterOptions& opts,
191+
const TabletColumn* column,
192+
std::unique_ptr<Field> field);
192193

193194
~VariantCompactionDocSnapshotWriter() override = default;
194195

0 commit comments

Comments
 (0)