Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 0 additions & 1 deletion be/src/common/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
const std::string DYNAMIC_COLUMN_NAME = "__DORIS_DYNAMIC_COL__";
const std::string PARTIAL_UPDATE_AUTO_INC_COL = "__PARTIAL_UPDATE_AUTO_INC_COLUMN__";
const std::string VIRTUAL_COLUMN_PREFIX = "__DORIS_VIRTUAL_COL__";
const std::string SPARSE_COLUMN_PATH = "__DORIS_VARIANT_SPARSE__";

/// The maximum precision representable by a 4-byte decimal (Decimal4Value)
constexpr int MAX_DECIMAL32_PRECISION = 9;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ struct OlapReaderStatistics {
int64_t variant_subtree_leaf_iter_count = 0;
int64_t variant_subtree_hierarchical_iter_count = 0;
int64_t variant_subtree_sparse_iter_count = 0;
int64_t variant_subtree_doc_snapshot_extract_iter_count = 0;
int64_t variant_subtree_doc_snapshot_all_iter_count = 0;
};

using ColumnId = uint32_t;
Expand Down
29 changes: 0 additions & 29 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
return Status::OK();
}
vectorized::Block flush_block(*block);
if (_context.write_type != DataWriteType::TYPE_COMPACTION &&
_context.tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(_parse_variant_columns(flush_block));
}
bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
if (config::enable_vertical_segment_writer) {
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
Expand All @@ -85,31 +81,6 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
return Status::OK();
}

Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block) {
size_t num_rows = block.rows();
if (num_rows == 0) {
return Status::OK();
}

std::vector<int> variant_column_pos;
for (int i = 0; i < block.columns(); ++i) {
const auto& entry = block.get_by_position(i);
if (entry.type->get_primitive_type() == TYPE_VARIANT) {
variant_column_pos.push_back(i);
}
}

if (variant_column_pos.empty()) {
return Status::OK();
}

vectorized::ParseConfig config;
config.enable_flatten_nested = _context.tablet_schema->variant_flatten_nested();
RETURN_IF_ERROR(
vectorized::schema_util::parse_variant_columns(block, variant_column_pos, config));
return Status::OK();
}

Status SegmentFlusher::close() {
RETURN_IF_ERROR(_seg_files.close());
RETURN_IF_ERROR(_idx_files.finish_close());
Expand Down
5 changes: 0 additions & 5 deletions be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,6 @@ class SegmentFlusher {
Status create_writer(std::unique_ptr<SegmentFlusher::Writer>& writer, uint32_t segment_id);

private:
// This method will catch exception when allocate memory failed
Status _parse_variant_columns(vectorized::Block& block) {
RETURN_IF_CATCH_EXCEPTION({ return _internal_parse_variant_columns(block); });
}
Status _internal_parse_variant_columns(vectorized::Block& block);
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts,
const TabletColumn* column, io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>* writer) {
if (column->is_extracted_column()) {
if (column->name().find(DOC_VALUE_COLUMN_PATH) != std::string::npos) {
*writer = std::make_unique<VariantCompactionDocSnapshotWriter>(
opts, column, std::unique_ptr<Field>(FieldFactory::create(*column)));
return Status::OK();
}
VLOG_DEBUG << "gen subwriter for " << column->path_info_ptr()->get_path();
*writer = std::make_unique<VariantSubcolumnWriter>(
opts, column, std::unique_ptr<Field>(FieldFactory::create(*column)));
Expand Down
22 changes: 1 addition & 21 deletions be/src/olap/rowset/segment_v2/external_col_meta_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,27 +172,7 @@ Status ExternalColMetaUtil::write_external_column_meta(
// 2) write pointers via proto fields
footer->set_col_meta_region_start(meta_region_start);

// 3) clear inline columns to enable true on-demand meta loading
// Note: footer->columns has already been pruned to only Top Level Columns in externalize_from_footer
// But for full externalization, we might want to clear them all or keep only necessary info?
// The original logic was footer->clear_columns().
// If we clear columns, the Reader needs to know how to reconstruct the schema.
// Currently, SegmentFooterPB.columns is used as the schema source if present.
// If we clear it, Reader must rely on External Meta.
// However, the Reader typically reads footer first. If columns is empty, it assumes V3 and reads external.
// So yes, we should clear it.
// But wait, in externalize_from_footer we carefully put Top Level columns back into footer->columns.
// Why? Because in previous logic, we might want to keep roots in footer?
// The previous logic: "replace Footer.columns with only the kept top-level columns".
// BUT then `write_external_column_meta` calls `footer->clear_columns()` at the end!
// So `footer->columns` will be empty anyway.
// The only reason to reconstruct `footer->columns` in `externalize_from_footer` is if `write_external_column_meta` logic depended on it.
// In my updated `write_external_column_meta`, I iterate over `all_metas` which is returned by `externalize_from_footer`.
// So I don't strictly need `footer->columns` to be correct in between.
// However, strictly following protocol: `externalize_from_footer` modifies footer to reflect "logical" columns (Top Level).
// And then `write_external_column_meta` finalizes it by clearing them and setting pointers.

footer->clear_columns();
// Note: footer->columns has already been pruned in externalize_from_footer
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ Status Segment::new_default_iterator(const TabletColumn& tablet_column,
Status Segment::new_column_iterator(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter,
const StorageReadOptions* opt,
const std::unordered_map<int32_t, PathToSparseColumnCacheUPtr>*
const std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr>*
variant_sparse_column_cache) {
if (opt->runtime_state != nullptr) {
_be_exec_version = opt->runtime_state->be_exec_version();
Expand All @@ -675,7 +675,7 @@ Status Segment::new_column_iterator(const TabletColumn& tablet_column,
}
if (reader->get_meta_type() == FieldType::OLAP_FIELD_TYPE_VARIANT) {
// if sparse_column_cache_ptr is nullptr, means the sparse column cache is not used
PathToSparseColumnCache* sparse_column_cache_ptr = nullptr;
PathToBinaryColumnCache* sparse_column_cache_ptr = nullptr;
if (variant_sparse_column_cache) {
auto it = variant_sparse_column_cache->find(unique_id);
if (it != variant_sparse_column_cache->end()) {
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ using SparseColumnCacheSPtr = std::shared_ptr<SparseColumnCache>;
using PathToSparseColumnCache = std::unordered_map<std::string, SparseColumnCacheSPtr>;
using PathToSparseColumnCacheUPtr = std::unique_ptr<PathToSparseColumnCache>;

struct BinaryColumnCache;
using BinaryColumnCacheSPtr = std::shared_ptr<BinaryColumnCache>;
using PathToBinaryColumnCache = std::unordered_map<std::string, BinaryColumnCacheSPtr>;
using PathToBinaryColumnCacheUPtr = std::unique_ptr<PathToBinaryColumnCache>;

// A Segment is used to represent a segment in memory format. When segment is
// generated, it won't be modified, so this struct aimed to help read operation.
// It will prepare all ColumnReader to create ColumnIterator as needed.
Expand Down Expand Up @@ -118,7 +123,7 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd
// if variant_sparse_column_cache is nullptr, means the sparse column cache is not used
Status new_column_iterator(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter, const StorageReadOptions* opt,
const std::unordered_map<int32_t, PathToSparseColumnCacheUPtr>*
const std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr>*
variant_sparse_column_cache = nullptr);

Status new_index_iterator(const TabletColumn& tablet_column, const TabletIndex* index_meta,
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
if (int32_t uid = col->get_unique_id(); !_variant_sparse_column_cache.contains(uid)) {
DCHECK(uid >= 0);
_variant_sparse_column_cache.emplace(uid,
std::make_unique<PathToSparseColumnCache>());
std::make_unique<PathToBinaryColumnCache>());
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ class SegmentIterator : public RowwiseIterator {
IndexQueryContextPtr _index_query_context;

// key is column uid, value is the sparse column cache
std::unordered_map<int32_t, PathToSparseColumnCacheUPtr> _variant_sparse_column_cache;
std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr> _variant_sparse_column_cache;

bool _find_condition_cache = false;
std::shared_ptr<std::vector<bool>> _condition_cache;
Expand Down
13 changes: 13 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "olap/rowset/segment_v2/page_io.h"
#include "olap/rowset/segment_v2/page_pointer.h"
#include "olap/rowset/segment_v2/variant/variant_ext_meta_writer.h"
#include "olap/rowset/segment_v2/variant/variant_util.h"
#include "olap/rowset/segment_v2/variant_stats_calculator.h"
#include "olap/segment_loader.h"
#include "olap/short_key_index.h"
Expand Down Expand Up @@ -530,6 +531,12 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
for (auto i : including_cids) {
full_block.replace_by_position(i, block->get_by_position(input_id++).column);
}

if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
_tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(
variant_util::parse_variant_columns(full_block, *_tablet_schema, including_cids));
}
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
&full_block, row_pos, num_rows, including_cids));

Expand Down Expand Up @@ -704,6 +711,12 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
_serialize_block_to_row_column(*block);
}

if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
_tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(variant_util::parse_variant_columns(const_cast<vectorized::Block&>(*block),
*_tablet_schema, _column_ids));
}

_olap_data_convertor->set_source_content(block, row_pos, num_rows);

// find all row pos for short key indexes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ namespace doris::segment_v2 {
#include "common/compile_check_begin.h"

// Base class for sparse column processors with common functionality
class BaseSparseColumnProcessor : public ColumnIterator {
class BaseBinaryColumnProcessor : public ColumnIterator {
protected:
const StorageReadOptions* _read_opts;
SparseColumnCacheSPtr _sparse_column_cache;
BinaryColumnCacheSPtr _sparse_column_cache;
// Pure virtual method for data processing when encounter existing sparse columns(to be implemented by subclasses)
virtual void _process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst,
size_t num_rows) = 0;
Expand All @@ -68,7 +68,7 @@ class BaseSparseColumnProcessor : public ColumnIterator {
size_t num_rows) = 0;

public:
BaseSparseColumnProcessor(SparseColumnCacheSPtr sparse_column_cache,
BaseBinaryColumnProcessor(BinaryColumnCacheSPtr sparse_column_cache,
const StorageReadOptions* opts)
: _read_opts(opts), _sparse_column_cache(std::move(sparse_column_cache)) {}

Expand Down Expand Up @@ -99,7 +99,7 @@ class BaseSparseColumnProcessor : public ColumnIterator {

SCOPED_RAW_TIMER(&_read_opts->stats->variant_fill_path_from_sparse_column_timer_ns);
const auto& offsets =
assert_cast<const vectorized::ColumnMap&>(*_sparse_column_cache->sparse_column)
assert_cast<const vectorized::ColumnMap&>(*_sparse_column_cache->binary_column)
.get_offsets();
if (offsets.back() == offsets[-1]) {
// no sparse column in this batch
Expand All @@ -113,11 +113,11 @@ class BaseSparseColumnProcessor : public ColumnIterator {
};

// Implementation for path extraction processor
class SparseColumnExtractIterator : public BaseSparseColumnProcessor {
class SparseColumnExtractIterator : public BaseBinaryColumnProcessor {
public:
SparseColumnExtractIterator(std::string_view path, SparseColumnCacheSPtr sparse_column_cache,
SparseColumnExtractIterator(std::string_view path, BinaryColumnCacheSPtr sparse_column_cache,
const StorageReadOptions* opts)
: BaseSparseColumnProcessor(std::move(sparse_column_cache), opts), _path(path) {}
: BaseBinaryColumnProcessor(std::move(sparse_column_cache), opts), _path(path) {}

// Batch processing using template method
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override {
Expand Down Expand Up @@ -157,10 +157,11 @@ class SparseColumnExtractIterator : public BaseSparseColumnProcessor {
nullable_column ? &nullable_column->get_null_map_data() : nullptr;
vectorized::ColumnVariant::fill_path_column_from_sparse_data(
*var.get_subcolumn({}) /*root*/, null_map, StringRef {_path.data(), _path.size()},
_sparse_column_cache->sparse_column->get_ptr(), 0,
_sparse_column_cache->sparse_column->size());
var.incr_num_rows(_sparse_column_cache->sparse_column->size());
_sparse_column_cache->binary_column->get_ptr(), 0,
_sparse_column_cache->binary_column->size());
var.incr_num_rows(_sparse_column_cache->binary_column->size());
var.get_sparse_column()->assume_mutable()->resize(var.rows());
var.get_doc_value_column()->assume_mutable()->resize(var.rows());
ENABLE_CHECK_CONSISTENCY(&var);
}

Expand Down
Loading