Skip to content

Commit c819c43

Browse files
committed
fix
1 parent da0dafa commit c819c43

File tree

23 files changed

+8734
-74
lines changed

23 files changed

+8734
-74
lines changed

be/src/olap/rowset/segment_creator.cpp

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,38 @@ Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block)
103103
return Status::OK();
104104
}
105105

106-
vectorized::ParseConfig config;
107-
config.enable_flatten_nested = _context.tablet_schema->variant_flatten_nested();
106+
std::vector<vectorized::ParseConfig> configs(variant_column_pos.size());
107+
for (size_t i = 0; i < variant_column_pos.size(); ++i) {
108+
configs[i].enable_flatten_nested = _context.tablet_schema->variant_flatten_nested();
109+
const auto& column = _context.tablet_schema->column(variant_column_pos[i]);
110+
if (column.is_variant_type()) {
111+
// enable doc snapshot mode
112+
if (column.variant_enable_doc_snapshot_mode()) {
113+
// if has schema template, no need to parse to doc snapshot, when writing data, we will parse to doc snapshot
114+
if (column.get_sub_columns().empty()) {
115+
configs[i].parse_to_doc_snapshot = true;
116+
} else {
117+
configs[i].parse_to_subcolumns = false;
118+
}
119+
120+
// if min rows is greater than 0, no need to parse to subcolumns
121+
// when compaction row size is greater than min rows, parse to subcolumns
122+
if (column.variant_doc_snapshot_min_rows() > 0) {
123+
configs[i].parse_to_subcolumns = false;
124+
} else {
125+
configs[i].parse_to_subcolumns = true;
126+
}
127+
} else {
128+
// default: only parse to subcolumns
129+
configs[i].parse_to_subcolumns = true;
130+
configs[i].parse_to_doc_snapshot = false;
131+
}
132+
} else {
133+
return Status::InternalError("column is not variant type, column name: {}", column.name());
134+
}
135+
}
108136
RETURN_IF_ERROR(
109-
vectorized::schema_util::parse_variant_columns(block, variant_column_pos, config));
137+
vectorized::schema_util::parse_variant_columns(block, variant_column_pos, configs));
110138
return Status::OK();
111139
}
112140

be/src/olap/rowset/segment_v2/segment_writer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ struct SegmentWriterOptions {
7676
RowsetWriterContext* rowset_ctx = nullptr;
7777
DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
7878
std::shared_ptr<MowContext> mow_ctx;
79+
80+
bool
7981
};
8082

8183
using TabletSharedPtr = std::shared_ptr<Tablet>;

be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,8 +1068,9 @@ Status VariantCompactionDocSnapshotWriter::finalize() {
10681068
size_t num_rows = variant_column->size();
10691069
auto converter = std::make_unique<vectorized::OlapBlockDataConvertor>();
10701070
int column_id = 0;
1071-
int variant_doc_snapshot_min_rows = parent_column.variant_doc_snapshot_min_rows();
1072-
if (num_rows >= variant_doc_snapshot_min_rows) {
1071+
int64_t variant_doc_snapshot_min_rows = parent_column.variant_doc_snapshot_min_rows();
1072+
if (variant_doc_snapshot_min_rows >= 0
1073+
&& num_rows >= static_cast<size_t>(variant_doc_snapshot_min_rows)) {
10731074
std::unordered_map<std::string_view, vectorized::ColumnVariant::Subcolumn> subcolumns;
10741075

10751076
auto [column_key, column_value] = variant_column->get_doc_snapshot_data_paths_and_values();

be/src/olap/tablet_schema.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,9 @@ class TabletColumn : public MetadataAdder<TabletColumn> {
233233

234234
bool variant_enable_doc_snapshot_mode() const { return _variant_enable_doc_snapshot_mode; }
235235

236-
int32_t variant_doc_snapshot_min_rows() const { return _variant_doc_snapshot_min_rows; }
236+
int64_t variant_doc_snapshot_min_rows() const { return _variant_doc_snapshot_min_rows; }
237237

238-
void set_variant_doc_snapshot_min_rows(int32_t variant_doc_snapshot_min_rows) {
238+
void set_variant_doc_snapshot_min_rows(int64_t variant_doc_snapshot_min_rows) {
239239
_variant_doc_snapshot_min_rows = variant_doc_snapshot_min_rows;
240240
}
241241

@@ -292,7 +292,7 @@ class TabletColumn : public MetadataAdder<TabletColumn> {
292292

293293
bool _variant_enable_doc_snapshot_mode = false;
294294

295-
int32_t _variant_doc_snapshot_min_rows = 0;
295+
int64_t _variant_doc_snapshot_min_rows = 0;
296296
};
297297

298298
bool operator==(const TabletColumn& a, const TabletColumn& b);

be/src/vec/common/schema_util.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
530530
}
531531

532532
Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
533-
const ParseConfig& config) {
533+
const std::vector<ParseConfig>& configs) {
534534
for (int i = 0; i < variant_pos.size(); ++i) {
535535
auto column_ref = block.get_by_position(variant_pos[i]).column;
536536
bool is_nullable = column_ref->is_nullable();
@@ -544,13 +544,11 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
544544

545545
MutableColumnPtr variant_column;
546546
if (var.is_doc_snapshot_mode()) {
547-
LOG(INFO) << "parse doc snapshot variant column";
548547
// doc snapshot mode, we need to parse the doc snapshot column
549-
parse_binary_to_variant(var, config);
548+
parse_binary_to_variant(var);
550549
continue;
551550
}
552551
if (!var.is_scalar_variant()) {
553-
LOG(INFO) << "already parsed variant column";
554552
// already parsed
555553
continue;
556554
}
@@ -580,7 +578,7 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
580578
if (scalar_root_column->is_column_string()) {
581579
variant_column = ColumnVariant::create(0);
582580
parse_json_to_variant(*variant_column.get(),
583-
assert_cast<const ColumnString&>(*scalar_root_column), config);
581+
assert_cast<const ColumnString&>(*scalar_root_column), configs[i]);
584582
} else {
585583
// Root maybe other types rather than string like ColumnVariant(Int32).
586584
// In this case, we should finlize the root and cast to JSON type
@@ -603,10 +601,10 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
603601
}
604602

605603
Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
606-
const ParseConfig& config) {
604+
const std::vector<ParseConfig>& configs) {
607605
// Parse each variant column from raw string column
608606
RETURN_IF_CATCH_EXCEPTION({
609-
return vectorized::schema_util::_parse_variant_columns(block, variant_pos, config);
607+
return vectorized::schema_util::_parse_variant_columns(block, variant_pos, configs);
610608
});
611609
}
612610

be/src/vec/common/schema_util.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ TabletColumn get_column_by_type(const vectorized::DataTypePtr& data_type, const
9898
// 2. finalize variant column to each subcolumn least commn types, default ignore sparse sub columns
9999
// 3. encode sparse sub columns
100100
Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
101-
const ParseConfig& config);
101+
const std::vector<ParseConfig>& configs);
102102

103103
// check if the tuple_paths has ambiguous paths
104104
// situation:

be/src/vec/json/json_parser.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ enum class ExtractType {
124124

125125
struct ParseConfig {
126126
bool enable_flatten_nested = false;
127+
// parse variant column to subcolumns or doc snapshot
128+
// if parse_to_subcolumns is true, the variant column will be parsed to subcolumns
129+
// if parse_to_doc_snapshot is true, the variant column will be parsed to doc snapshot
130+
// if both are true, the variant column will be parsed to both subcolumns and doc snapshot
131+
bool parse_to_subcolumns = true;
132+
bool parse_to_doc_snapshot = false;
127133
};
128134
/// Result of parsing of a document.
129135
/// Contains all paths extracted from document

be/src/vec/json/parse2column.cpp

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -165,35 +165,56 @@ void parse_json_to_variant(IColumn& column, const char* src, size_t length,
165165
check_paths.insert(check_paths.end(), paths.begin(), paths.end());
166166
THROW_IF_ERROR(vectorized::schema_util::check_variant_has_no_ambiguous_paths(check_paths));
167167
}
168-
for (size_t i = 0; i < paths.size(); ++i) {
169-
FieldInfo field_info;
170-
schema_util::get_field_info(values[i], &field_info);
171-
if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
172-
continue;
173-
}
174-
if (column_variant.get_subcolumn(paths[i], i) == nullptr) {
175-
if (paths[i].has_nested_part()) {
176-
column_variant.add_nested_subcolumn(paths[i], field_info, old_num_rows);
177-
} else {
178-
column_variant.add_sub_column(paths[i], old_num_rows);
168+
auto [doc_snapshot_data_paths, doc_snapshot_data_values] = column_variant.get_doc_snapshot_data_paths_and_values();
169+
auto& doc_snapshot_data_offsets = column_variant.serialized_doc_snapshot_column_offsets();
170+
if (config.parse_to_subcolumns) {
171+
for (size_t i = 0; i < paths.size(); ++i) {
172+
FieldInfo field_info;
173+
schema_util::get_field_info(values[i], &field_info);
174+
if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
175+
continue;
176+
}
177+
if (column_variant.get_subcolumn(paths[i], i) == nullptr) {
178+
if (paths[i].has_nested_part()) {
179+
column_variant.add_nested_subcolumn(paths[i], field_info, old_num_rows);
180+
} else {
181+
column_variant.add_sub_column(paths[i], old_num_rows);
182+
}
183+
}
184+
auto* subcolumn = column_variant.get_subcolumn(paths[i], i);
185+
if (!subcolumn) {
186+
throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to find sub column {}",
187+
paths[i].get_path());
188+
}
189+
if (subcolumn->cur_num_of_defaults() > 0) {
190+
subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults());
191+
subcolumn->reset_current_num_of_defaults();
192+
}
193+
if (subcolumn->size() != old_num_rows) {
194+
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
195+
"subcolumn {} size missmatched, may contains duplicated entry",
196+
paths[i].get_path());
197+
}
198+
subcolumn->insert(std::move(values[i]), std::move(field_info));
199+
if (!paths[i].empty() && config.parse_to_doc_snapshot) {
200+
subcolumn->serialize_to_sparse_column(doc_snapshot_data_paths, paths[i].get_path(), doc_snapshot_data_values, old_num_rows);
179201
}
180202
}
181-
auto* subcolumn = column_variant.get_subcolumn(paths[i], i);
182-
if (!subcolumn) {
183-
throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to find sub column {}",
184-
paths[i].get_path());
185-
}
186-
if (subcolumn->cur_num_of_defaults() > 0) {
187-
subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults());
188-
subcolumn->reset_current_num_of_defaults();
189-
}
190-
if (subcolumn->size() != old_num_rows) {
191-
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
192-
"subcolumn {} size missmatched, may contains duplicated entry",
193-
paths[i].get_path());
203+
} else {
204+
CHECK(config.parse_to_doc_snapshot);
205+
for (size_t i = 0; i < paths.size(); ++i) {
206+
FieldInfo field_info;
207+
schema_util::get_field_info(values[i], &field_info);
208+
if (paths[i].empty()) {
209+
column_variant.get_subcolumn(paths[i])->insert(std::move(values[i]), std::move(field_info));
210+
continue;
211+
}
212+
ColumnVariant::Subcolumn tmp_subcolumn(0, true);
213+
tmp_subcolumn.insert(std::move(values[i]), std::move(field_info));
214+
tmp_subcolumn.serialize_to_sparse_column(doc_snapshot_data_paths, paths[i].get_path(), doc_snapshot_data_values, 0);
194215
}
195-
subcolumn->insert(std::move(values[i]), std::move(field_info));
196216
}
217+
doc_snapshot_data_offsets.push_back(doc_snapshot_data_paths->size());
197218
// /// Insert default values to missed subcolumns.
198219
const auto& subcolumns = column_variant.get_subcolumns();
199220
for (const auto& entry : subcolumns) {
@@ -215,7 +236,6 @@ void parse_json_to_variant(IColumn& column, const char* src, size_t length,
215236
auto sparse_column = column_variant.get_sparse_column();
216237
if (sparse_column->size() == old_num_rows) {
217238
sparse_column->assume_mutable()->insert_default();
218-
column_variant.get_doc_snapshot_column()->assume_mutable()->insert_default();
219239
}
220240
#ifndef NDEBUG
221241
column_variant.check_consistency();
@@ -240,7 +260,7 @@ void parse_json_to_variant(IColumn& column, const ColumnString& raw_json_column,
240260

241261

242262
// pasre the doc snapshot column to subcolumns
243-
void parse_binary_to_variant(ColumnVariant& column_variant, const ParseConfig& config) {
263+
void parse_binary_to_variant(ColumnVariant& column_variant) {
244264
std::unordered_map<std::string_view, vectorized::ColumnVariant::Subcolumn> subcolumns;
245265

246266
auto [column_key, column_value] = column_variant.get_doc_snapshot_data_paths_and_values();

be/src/vec/json/parse2column.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,5 @@ void parse_json_to_variant(IColumn& column, const ColumnString& raw_json_column,
4343
void parse_json_to_variant(IColumn& column, const StringRef& jsons, JsonParser* parser,
4444
const ParseConfig& config);
4545

46-
void parse_binary_to_variant(ColumnVariant& column_variant, const ParseConfig& config);
46+
void parse_binary_to_variant(ColumnVariant& column_variant);
4747
} // namespace doris::vectorized

fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,12 +1050,12 @@ public boolean getVariantEnableDocSnapshotMode() {
10501050
return false; // The old variant type had a default value of false.
10511051
}
10521052

1053-
public int getVariantDocSnapshotMinRows() {
1053+
public long getVariantDocSnapshotMinRows() {
10541054
// In the past, variant metadata used the ScalarType type.
10551055
// Now, we use VariantType, which inherits from ScalarType, as the new metadata storage.
10561056
if (this instanceof VariantType) {
10571057
return ((VariantType) this).getVariantDocSnapshotMinRows();
10581058
}
1059-
return 0; // The old variant type had a default value of 0.
1059+
return 0L; // The old variant type had a default value of 0.
10601060
}
10611061
}

0 commit comments

Comments
 (0)