diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index d67c1957c14aa7..f1bac1c783c629 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -398,8 +398,10 @@ Status PushBrokerReader::init() { _io_ctx->query_id = &_runtime_state->query_id(); auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots(); + uint32_t idx = 0; for (auto& slot_desc : slot_descs) { _all_col_names.push_back(to_lower((slot_desc->col_name()))); + _col_name_to_block_idx.insert({to_lower(slot_desc->col_name()), idx++}); } RETURN_IF_ERROR(_init_expr_ctxes()); @@ -646,9 +648,9 @@ Status PushBrokerReader::_get_next_reader() { _io_ctx.get(), _runtime_state.get()); init_status = parquet_reader->init_reader( - _all_col_names, _push_down_exprs, _real_tuple_desc, _default_val_row_desc.get(), - _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, - &_slot_id_to_filter_conjuncts, + _all_col_names, &_col_name_to_block_idx, _push_down_exprs, _real_tuple_desc, + _default_val_row_desc.get(), _col_name_to_slot_id, + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts, vectorized::TableSchemaChangeHelper::ConstNode::get_instance(), false); _cur_reader = std::move(parquet_reader); if (!init_status.ok()) { diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index c441839763dd06..92a1fd2cfde908 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -139,6 +139,7 @@ class PushBrokerReader { // col names from _slot_descs std::vector _all_col_names; + std::unordered_map _col_name_to_block_idx; vectorized::VExprContextSPtrs _push_down_exprs; const std::unordered_map* _col_name_to_slot_id; // single slot filter conjuncts diff --git a/be/src/vec/exec/format/jni_reader.h b/be/src/vec/exec/format/jni_reader.h index f51e68f1f4ef74..325b7221d044bb 100644 --- a/be/src/vec/exec/format/jni_reader.h +++ b/be/src/vec/exec/format/jni_reader.h @@ -68,6 +68,13 @@ class JniReader : public GenericReader { return Status::OK(); } + void set_col_name_to_block_idx( + const std::unordered_map* col_name_to_block_idx) { + if (_jni_connector) { + _jni_connector->set_col_name_to_block_idx(col_name_to_block_idx); + } + } + protected: void _collect_profile_before_close() override { if (_jni_connector) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 86ba432ab4d738..fe4f02c87917f4 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -350,13 +350,16 @@ Status OrcReader::_create_file_reader() { } Status OrcReader::init_reader( - const std::vector* column_names, const VExprContextSPtrs& conjuncts, - bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::vector* column_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts, std::shared_ptr table_info_node_ptr, const std::set& column_ids, const std::set& filter_column_ids) { _table_column_names = column_names; + _col_name_to_block_idx = col_name_to_block_idx; _lazy_read_ctx.conjuncts = conjuncts; _is_acid = is_acid; _tuple_descriptor = tuple_descriptor; @@ -1334,10 +1337,9 @@ Status OrcReader::_fill_partition_columns( const std::unordered_map>& partition_columns) { DataTypeSerDe::FormatOptions _text_formatOptions; - // todo: maybe do not need to build name to index map every time - auto name_to_pos_map = block->get_name_to_pos_map(); for (const auto& kv : partition_columns) { - auto col_ptr = block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable(); + auto col_ptr = block->get_by_position((*_col_name_to_block_idx)[kv.first]) + .column->assume_mutable(); const auto& [value, slot_desc] = kv.second; auto text_serde = slot_desc->get_data_type_ptr()->get_serde(); Slice slice(value.data(), value.size()); @@ -1362,18 +1364,16 @@ Status OrcReader::_fill_partition_columns( Status OrcReader::_fill_missing_columns( Block* block, uint64_t rows, const std::unordered_map& missing_columns) { - // todo: maybe do not need to build name to index map every time - auto name_to_pos_map = block->get_name_to_pos_map(); std::set positions_to_erase; for (const auto& kv : missing_columns) { - if (!name_to_pos_map.contains(kv.first)) { + if (!_col_name_to_block_idx->contains(kv.first)) { return Status::InternalError("Failed to find missing column: {}, block: {}", kv.first, block->dump_structure()); } if (kv.second == nullptr) { // no default column, fill with null - auto mutable_column = - block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable(); + auto mutable_column = block->get_by_position((*_col_name_to_block_idx)[kv.first]) + .column->assume_mutable(); auto* nullable_column = static_cast(mutable_column.get()); nullable_column->insert_many_defaults(rows); } else { @@ -1393,10 +1393,11 @@ Status OrcReader::_fill_missing_columns( mutable_column->resize(rows); // result_column_ptr maybe a ColumnConst, convert it to a normal column result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); - auto origin_column_type = block->get_by_position(name_to_pos_map[kv.first]).type; + auto origin_column_type = + block->get_by_position((*_col_name_to_block_idx)[kv.first]).type; bool is_nullable = origin_column_type->is_nullable(); block->replace_by_position( - name_to_pos_map[kv.first], + (*_col_name_to_block_idx)[kv.first], is_nullable ? make_nullable(result_column_ptr) : result_column_ptr); positions_to_erase.insert(result_column_id); } @@ -2234,10 +2235,9 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo std::vector batch_vec; _fill_batch_vec(batch_vec, _batch.get(), 0); - // todo: maybe do not need to build name to index map every time - auto name_to_pos_map = block->get_name_to_pos_map(); for (auto& col_name : _lazy_read_ctx.lazy_read_columns) { - auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]); + auto& column_with_type_and_name = + block->get_by_position((*_col_name_to_block_idx)[col_name]); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name); @@ -2303,17 +2303,15 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo } } - // todo: maybe do not need to build name to index map every time - auto name_to_pos_map = block->get_name_to_pos_map(); if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) { for (auto& dict_filter_cols : _dict_filter_cols) { MutableColumnPtr dict_col_ptr = ColumnInt32::create(); - if (!name_to_pos_map.contains(dict_filter_cols.first)) { + if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) { return Status::InternalError( "Failed to find dict filter column '{}' in block {}", dict_filter_cols.first, block->dump_structure()); } - auto pos = name_to_pos_map[dict_filter_cols.first]; + auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first]; auto& column_with_type_and_name = block->get_by_position(pos); auto& column_type = column_with_type_and_name.type; if (column_type->is_nullable()) { @@ -2335,7 +2333,8 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo _fill_batch_vec(batch_vec, _batch.get(), 0); for (auto& col_name : _lazy_read_ctx.all_read_columns) { - auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]); + auto& column_with_type_and_name = + block->get_by_position((*_col_name_to_block_idx)[col_name]); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name); @@ -2446,17 +2445,17 @@ void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) { if (_delete_rows != nullptr) { _delete_rows_filter_ptr = std::make_unique(rows, 1); auto* __restrict _pos_delete_filter_data = _delete_rows_filter_ptr->data(); - // todo: maybe do not need to build name to index map every time - auto name_to_pos_map = block->get_name_to_pos_map(); const auto& original_transaction_column = assert_cast(*remove_nullable( - block->get_by_position( - name_to_pos_map[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE]) + block->get_by_position((*_col_name_to_block_idx) + [TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE]) .column)); const auto& bucket_id_column = assert_cast(*remove_nullable( - block->get_by_position(name_to_pos_map[TransactionalHive::BUCKET_LOWER_CASE]) + block->get_by_position( + (*_col_name_to_block_idx)[TransactionalHive::BUCKET_LOWER_CASE]) .column)); const auto& row_id_column = assert_cast(*remove_nullable( - block->get_by_position(name_to_pos_map[TransactionalHive::ROW_ID_LOWER_CASE]) + block->get_by_position( + (*_col_name_to_block_idx)[TransactionalHive::ROW_ID_LOWER_CASE]) .column)); for (int i = 0; i < rows; ++i) { auto original_transaction = original_transaction_column.get_int(i); @@ -2480,15 +2479,13 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s size_t origin_column_num = block->columns(); if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) { - // todo: maybe do not need to build name to index map every time - auto name_to_pos_map = block->get_name_to_pos_map(); for (auto& dict_filter_cols : _dict_filter_cols) { - if (!name_to_pos_map.contains(dict_filter_cols.first)) { + if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) { return Status::InternalError("Failed to find dict filter column '{}' in block {}", dict_filter_cols.first, block->dump_structure()); } MutableColumnPtr dict_col_ptr = ColumnInt32::create(); - auto pos = name_to_pos_map[dict_filter_cols.first]; + auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first]; auto& column_with_type_and_name = block->get_by_position(pos); auto& column_type = column_with_type_and_name.type; if (column_type->is_nullable()) { @@ -2514,10 +2511,9 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end()); } - // todo: maybe do not need to build name to index map every time - auto name_to_pos_map = block->get_name_to_pos_map(); for (auto& table_col_name : table_col_names) { - auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[table_col_name]); + auto& column_with_type_and_name = + block->get_by_position((*_col_name_to_block_idx)[table_col_name]); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; auto file_column_name = _table_info_node_ptr->children_file_column_name(table_col_name); @@ -2569,13 +2565,19 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s if (can_filter_all) { for (auto& col : table_col_names) { // clean block to read predicate columns and acid columns - block->get_by_position(name_to_pos_map[col]).column->assume_mutable()->clear(); + block->get_by_position((*_col_name_to_block_idx)[col]) + .column->assume_mutable() + ->clear(); } for (auto& col : _lazy_read_ctx.predicate_partition_columns) { - block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear(); + block->get_by_position((*_col_name_to_block_idx)[col.first]) + .column->assume_mutable() + ->clear(); } for (auto& col : _lazy_read_ctx.predicate_missing_columns) { - block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear(); + block->get_by_position((*_col_name_to_block_idx)[col.first]) + .column->assume_mutable() + ->clear(); } Block::erase_useless_column(block, origin_column_num); RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr)); @@ -2885,14 +2887,12 @@ Status OrcReader::_convert_dict_cols_to_string_cols( return Status::OK(); } if (!_dict_filter_cols.empty()) { - // todo: maybe do not need to build name to index map every time - auto name_to_pos_map = block->get_name_to_pos_map(); for (auto& dict_filter_cols : _dict_filter_cols) { - if (!name_to_pos_map.contains(dict_filter_cols.first)) { + if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) { return Status::InternalError("Failed to find dict filter column '{}' in block {}", dict_filter_cols.first, block->dump_structure()); } - auto pos = name_to_pos_map[dict_filter_cols.first]; + auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first]; ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos); const ColumnPtr& column = column_with_type_and_name.column; diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index c41e6be4131149..4e81631491ce96 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -157,9 +157,10 @@ class OrcReader : public GenericReader { ~OrcReader() override = default; //If you want to read the file by index instead of column name, set hive_use_column_names to false. Status init_reader( - const std::vector* column_names, const VExprContextSPtrs& conjuncts, - bool is_acid, const TupleDescriptor* tuple_descriptor, - const RowDescriptor* row_descriptor, + const std::vector* column_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, bool is_acid, + const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts, std::shared_ptr table_info_node_ptr = @@ -726,6 +727,9 @@ class OrcReader : public GenericReader { std::set _column_ids; std::set _filter_column_ids; + // Pointer to external column name to block index mapping (from FileScanner) + std::unordered_map* _col_name_to_block_idx = nullptr; + VExprSPtrs _push_down_exprs; }; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 58f1c337555452..5c69eec7e51ac3 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -391,32 +391,32 @@ Status RowGroupReader::_read_column_data(Block* block, FilterMap& filter_map) { size_t batch_read_rows = 0; bool has_eof = false; - // todo: maybe do not need to build name to index map every time - auto name_to_idx = block->get_name_to_pos_map(); for (auto& read_col_name : table_columns) { - auto& column_with_type_and_name = block->safe_get_by_position(name_to_idx[read_col_name]); + auto& column_with_type_and_name = + block->safe_get_by_position((*_col_name_to_block_idx)[read_col_name]); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; bool is_dict_filter = false; for (auto& _dict_filter_col : _dict_filter_cols) { if (_dict_filter_col.first == read_col_name) { MutableColumnPtr dict_column = ColumnInt32::create(); - if (!name_to_idx.contains(read_col_name)) { + if (!_col_name_to_block_idx->contains(read_col_name)) { return Status::InternalError( "Wrong read column '{}' in parquet file, block: {}", read_col_name, block->dump_structure()); } if (column_type->is_nullable()) { - block->get_by_position(name_to_idx[read_col_name]).type = + block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type = std::make_shared(std::make_shared()); block->replace_by_position( - name_to_idx[read_col_name], + (*_col_name_to_block_idx)[read_col_name], ColumnNullable::create(std::move(dict_column), ColumnUInt8::create(dict_column->size(), 0))); } else { - block->get_by_position(name_to_idx[read_col_name]).type = + block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type = std::make_shared(); - block->replace_by_position(name_to_idx[read_col_name], std::move(dict_column)); + block->replace_by_position((*_col_name_to_block_idx)[read_col_name], + std::move(dict_column)); } is_dict_filter = true; break; @@ -531,18 +531,19 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re if (filter_map_ptr->filter_all()) { { SCOPED_RAW_TIMER(&_predicate_filter_time); - auto name_to_idx = block->get_name_to_pos_map(); for (const auto& col : _lazy_read_ctx.predicate_columns.first) { // clean block to read predicate columns - block->get_by_position(name_to_idx[col]).column->assume_mutable()->clear(); + block->get_by_position((*_col_name_to_block_idx)[col]) + .column->assume_mutable() + ->clear(); } for (const auto& col : _lazy_read_ctx.predicate_partition_columns) { - block->get_by_position(name_to_idx[col.first]) + block->get_by_position((*_col_name_to_block_idx)[col.first]) .column->assume_mutable() ->clear(); } for (const auto& col : _lazy_read_ctx.predicate_missing_columns) { - block->get_by_position(name_to_idx[col.first]) + block->get_by_position((*_col_name_to_block_idx)[col.first]) .column->assume_mutable() ->clear(); } @@ -675,9 +676,8 @@ Status RowGroupReader::_fill_partition_columns( const std::unordered_map>& partition_columns) { DataTypeSerDe::FormatOptions _text_formatOptions; - auto name_to_idx = block->get_name_to_pos_map(); for (const auto& kv : partition_columns) { - auto doris_column = block->get_by_position(name_to_idx[kv.first]).column; + auto doris_column = block->get_by_position((*_col_name_to_block_idx)[kv.first]).column; // obtained from block*, it is a mutable object. auto* col_ptr = const_cast(doris_column.get()); const auto& [value, slot_desc] = kv.second; @@ -705,18 +705,16 @@ Status RowGroupReader::_fill_partition_columns( Status RowGroupReader::_fill_missing_columns( Block* block, size_t rows, const std::unordered_map& missing_columns) { - // todo: maybe do not need to build name to index map every time - auto name_to_idx = block->get_name_to_pos_map(); std::set positions_to_erase; for (const auto& kv : missing_columns) { - if (!name_to_idx.contains(kv.first)) { + if (!_col_name_to_block_idx->contains(kv.first)) { return Status::InternalError("Missing column: {} not found in block {}", kv.first, block->dump_structure()); } if (kv.second == nullptr) { // no default column, fill with null - auto mutable_column = - block->get_by_position(name_to_idx[kv.first]).column->assume_mutable(); + auto mutable_column = block->get_by_position((*_col_name_to_block_idx)[kv.first]) + .column->assume_mutable(); auto* nullable_column = assert_cast(mutable_column.get()); nullable_column->insert_many_defaults(rows); } else { @@ -736,10 +734,11 @@ Status RowGroupReader::_fill_missing_columns( mutable_column->resize(rows); // result_column_ptr maybe a ColumnConst, convert it to a normal column result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); - auto origin_column_type = block->get_by_position(name_to_idx[kv.first]).type; + auto origin_column_type = + block->get_by_position((*_col_name_to_block_idx)[kv.first]).type; bool is_nullable = origin_column_type->is_nullable(); block->replace_by_position( - name_to_idx[kv.first], + (*_col_name_to_block_idx)[kv.first], is_nullable ? make_nullable(result_column_ptr) : result_column_ptr); positions_to_erase.insert(result_column_id); } @@ -1095,16 +1094,14 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector& dict_codes, } void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { - // todo: maybe do not need to build name to index map every time - auto name_to_idx = block->get_name_to_pos_map(); for (auto& dict_filter_cols : _dict_filter_cols) { - if (!name_to_idx.contains(dict_filter_cols.first)) { + if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) { throw Exception(ErrorCode::INTERNAL_ERROR, "Wrong read column '{}' in parquet file, block: {}", dict_filter_cols.first, block->dump_structure()); } ColumnWithTypeAndName& column_with_type_and_name = - block->get_by_position(name_to_idx[dict_filter_cols.first]); + block->get_by_position((*_col_name_to_block_idx)[dict_filter_cols.first]); const ColumnPtr& column = column_with_type_and_name.column; if (const auto* nullable_column = check_and_get_column(*column)) { const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); @@ -1118,7 +1115,7 @@ void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { column_with_type_and_name.type = std::make_shared(std::make_shared()); block->replace_by_position( - name_to_idx[dict_filter_cols.first], + (*_col_name_to_block_idx)[dict_filter_cols.first], ColumnNullable::create(std::move(string_column), nullable_column->get_null_map_column_ptr())); } else { @@ -1128,7 +1125,7 @@ void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { dict_column); column_with_type_and_name.type = std::make_shared(); - block->replace_by_position(name_to_idx[dict_filter_cols.first], + block->replace_by_position((*_col_name_to_block_idx)[dict_filter_cols.first], std::move(string_column)); } } diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 265a95f4470537..f81e7712bc6057 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -179,6 +179,11 @@ class RowGroupReader : public ProfileCollector { _current_row_group_idx = row_group_idx; } + void set_col_name_to_block_idx( + std::unordered_map* col_name_to_block_idx) { + _col_name_to_block_idx = col_name_to_block_idx; + } + protected: void _collect_profile_before_close() override { if (_file_reader != nullptr) { @@ -260,6 +265,8 @@ class RowGroupReader : public ProfileCollector { std::pair, int> _row_id_column_iterator_pair = {nullptr, -1}; std::vector _current_batch_row_ids; + + std::unordered_map* _col_name_to_block_idx = nullptr; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 10e2c1a0f3939b..54732fe1f384c7 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -327,13 +327,16 @@ void ParquetReader::_init_file_description() { } Status ParquetReader::init_reader( - const std::vector& all_column_names, const VExprContextSPtrs& conjuncts, - const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::vector& all_column_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts, std::shared_ptr table_info_node_ptr, bool filter_groups, const std::set& column_ids, const std::set& filter_column_ids) { + _col_name_to_block_idx = col_name_to_block_idx; _tuple_descriptor = tuple_descriptor; _row_descriptor = row_descriptor; _colname_to_slot_id = colname_to_slot_id; @@ -799,6 +802,7 @@ Status ParquetReader::_next_row_group_reader() { _current_group_reader->set_current_row_group_idx(_current_row_group_index); _current_group_reader->set_row_id_column_iterator(_row_id_column_iterator_pair); + _current_group_reader->set_col_name_to_block_idx(_col_name_to_block_idx); _current_group_reader->_table_info_node_ptr = _table_info_node_ptr; return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges, _col_offsets, diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 26cc7a7436d2ca..ed848752ec34e7 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -117,8 +117,10 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { #endif Status init_reader( - const std::vector& all_column_names, const VExprContextSPtrs& conjuncts, - const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::vector& all_column_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts, @@ -336,6 +338,8 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { std::set _column_ids; std::set _filter_column_ids; + std::unordered_map* _col_name_to_block_idx = nullptr; + // Since the filtering conditions for topn are dynamic, the filtering is delayed until create next row group reader. VExprSPtrs _top_runtime_vexprs; std::vector> _push_down_predicates; diff --git a/be/src/vec/exec/format/table/equality_delete.cpp b/be/src/vec/exec/format/table/equality_delete.cpp index 48914a021441d8..20162e082f1bc9 100644 --- a/be/src/vec/exec/format/table/equality_delete.cpp +++ b/be/src/vec/exec/format/table/equality_delete.cpp @@ -43,14 +43,11 @@ Status SimpleEqualityDelete::_build_set() { return Status::OK(); } -Status SimpleEqualityDelete::filter_data_block(Block* data_block) { +Status SimpleEqualityDelete::filter_data_block( + Block* data_block, const std::unordered_map* col_name_to_block_idx) { SCOPED_TIMER(equality_delete_time); - int pos = data_block->get_position_by_name(_delete_column_name); - if (pos == -1) { - return Status::InternalError("Column '{}' not found in data block: {}", _delete_column_name, - data_block->dump_structure()); - } - auto column_and_type = data_block->get_by_position(pos); + auto column_and_type = + data_block->get_by_position(col_name_to_block_idx->at(_delete_column_name)); if (column_and_type.type->get_primitive_type() != _delete_column_type) { return Status::InternalError( "Not support type change in column '{}', src type: {}, target type: {}", @@ -106,25 +103,25 @@ Status MultiEqualityDelete::_build_set() { return Status::OK(); } -Status MultiEqualityDelete::filter_data_block(Block* data_block) { +Status MultiEqualityDelete::filter_data_block( + Block* data_block, const std::unordered_map* col_name_to_block_idx) { SCOPED_TIMER(equality_delete_time); size_t column_index = 0; - // todo: maybe do not need to build name to index map every time - auto name_to_pos_map = data_block->get_name_to_pos_map(); for (auto delete_col : _delete_block->get_columns_with_type_and_name()) { const std::string& column_name = delete_col.name; - auto column_and_type = data_block->safe_get_by_position(name_to_pos_map[column_name]); - if (name_to_pos_map.contains(column_name) == false) { + if (!col_name_to_block_idx->contains(column_name)) { return Status::InternalError("Column '{}' not found in data block: {}", column_name, data_block->dump_structure()); } + auto column_and_type = + data_block->safe_get_by_position(col_name_to_block_idx->at(column_name)); if (!delete_col.type->equals(*column_and_type.type)) { return Status::InternalError( "Not support type change in column '{}', src type: {}, target type: {}", column_name, delete_col.type->get_name(), column_and_type.type->get_name()); } - _data_column_index[column_index++] = name_to_pos_map[column_name]; + _data_column_index[column_index++] = col_name_to_block_idx->at(column_name); } size_t rows = data_block->rows(); _data_hashes.clear(); diff --git a/be/src/vec/exec/format/table/equality_delete.h b/be/src/vec/exec/format/table/equality_delete.h index fbcbcfdaa77fa7..1c9f91a01cb7e9 100644 --- a/be/src/vec/exec/format/table/equality_delete.h +++ b/be/src/vec/exec/format/table/equality_delete.h @@ -56,7 +56,9 @@ class EqualityDeleteBase { return _build_set(); } - virtual Status filter_data_block(Block* data_block) = 0; + virtual Status filter_data_block( + Block* data_block, + const std::unordered_map* col_name_to_block_idx) = 0; static std::unique_ptr get_delete_impl(Block* delete_block); }; @@ -73,7 +75,9 @@ class SimpleEqualityDelete : public EqualityDeleteBase { public: SimpleEqualityDelete(Block* delete_block) : EqualityDeleteBase(delete_block) {} - Status filter_data_block(Block* data_block) override; + Status filter_data_block( + Block* data_block, + const std::unordered_map* col_name_to_block_idx) override; }; /** @@ -100,7 +104,9 @@ class MultiEqualityDelete : public EqualityDeleteBase { public: MultiEqualityDelete(Block* delete_block) : EqualityDeleteBase(delete_block) {} - Status filter_data_block(Block* data_block) override; + Status filter_data_block( + Block* data_block, + const std::unordered_map* col_name_to_block_idx) override; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/table/hive_reader.cpp b/be/src/vec/exec/format/table/hive_reader.cpp index 35ae936cc27b3b..ac004230bd0aab 100644 --- a/be/src/vec/exec/format/table/hive_reader.cpp +++ b/be/src/vec/exec/format/table/hive_reader.cpp @@ -34,8 +34,10 @@ Status HiveReader::get_next_block_inner(Block* block, size_t* read_rows, bool* e }; Status HiveOrcReader::init_reader( - const std::vector& read_table_col_names, const VExprContextSPtrs& conjuncts, - const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::vector& read_table_col_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { auto* orc_reader = static_cast(_file_format_reader.get()); @@ -89,10 +91,10 @@ Status HiveOrcReader::init_reader( const auto& column_ids = column_id_result.column_ids; const auto& filter_column_ids = column_id_result.filter_column_ids; - return orc_reader->init_reader(&read_table_col_names, conjuncts, false, tuple_descriptor, - row_descriptor, not_single_slot_filter_conjuncts, - slot_id_to_filter_conjuncts, table_info_node_ptr, column_ids, - filter_column_ids); + return orc_reader->init_reader(&read_table_col_names, col_name_to_block_idx, conjuncts, false, + tuple_descriptor, row_descriptor, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, + table_info_node_ptr, column_ids, filter_column_ids); } ColumnIdResult HiveOrcReader::_create_column_ids(const orc::Type* orc_type, @@ -209,8 +211,10 @@ ColumnIdResult HiveOrcReader::_create_column_ids_by_top_level_col_index( } Status HiveParquetReader::init_reader( - const std::vector& read_table_col_names, const VExprContextSPtrs& conjuncts, - const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::vector& read_table_col_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { @@ -281,9 +285,9 @@ Status HiveParquetReader::init_reader( RETURN_IF_ERROR(init_row_filters()); return parquet_reader->init_reader( - read_table_col_names, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, - not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, table_info_node_ptr, - true, column_ids, filter_column_ids); + read_table_col_names, col_name_to_block_idx, conjuncts, tuple_descriptor, + row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts, + slot_id_to_filter_conjuncts, table_info_node_ptr, true, column_ids, filter_column_ids); } ColumnIdResult HiveParquetReader::_create_column_ids(const FieldDescriptor* field_desc, diff --git a/be/src/vec/exec/format/table/hive_reader.h b/be/src/vec/exec/format/table/hive_reader.h index a817209f31c42f..a47ac8164eb486 100644 --- a/be/src/vec/exec/format/table/hive_reader.h +++ b/be/src/vec/exec/format/table/hive_reader.h @@ -60,6 +60,7 @@ class HiveOrcReader final : public HiveReader { Status init_reader( const std::vector& read_table_col_names, + std::unordered_map* col_name_to_block_idx, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, @@ -86,6 +87,7 @@ class HiveParquetReader final : public HiveReader { Status init_reader( const std::vector& read_table_col_names, + std::unordered_map* col_name_to_block_idx, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, diff --git a/be/src/vec/exec/format/table/hudi_reader.cpp b/be/src/vec/exec/format/table/hudi_reader.cpp index 62f2a034c35c45..d7bd32ae4987e7 100644 --- a/be/src/vec/exec/format/table/hudi_reader.cpp +++ b/be/src/vec/exec/format/table/hudi_reader.cpp @@ -31,8 +31,10 @@ Status HudiReader::get_next_block_inner(Block* block, size_t* read_rows, bool* e }; Status HudiParquetReader::init_reader( - const std::vector& read_table_col_names, const VExprContextSPtrs& conjuncts, - const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::vector& read_table_col_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { @@ -45,9 +47,10 @@ Status HudiParquetReader::init_reader( RETURN_IF_ERROR(gen_table_info_node_by_field_id( _params, _range.table_format_params.hudi_params.schema_id, tuple_descriptor, *field_desc)); - return parquet_reader->init_reader( - read_table_col_names, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, - not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, table_info_node_ptr); + return parquet_reader->init_reader(read_table_col_names, col_name_to_block_idx, conjuncts, + tuple_descriptor, row_descriptor, colname_to_slot_id, + not_single_slot_filter_conjuncts, + slot_id_to_filter_conjuncts, table_info_node_ptr); } #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/table/hudi_reader.h b/be/src/vec/exec/format/table/hudi_reader.h index 781c8dedf8f969..ff15eb78c933ae 100644 --- a/be/src/vec/exec/format/table/hudi_reader.h +++ b/be/src/vec/exec/format/table/hudi_reader.h @@ -50,6 +50,7 @@ class HudiParquetReader final : public HudiReader { Status init_reader( const std::vector& read_table_col_names, + std::unordered_map* col_name_to_block_idx, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, @@ -69,6 +70,7 @@ class HudiOrcReader final : public HudiReader { Status init_reader( const std::vector& read_table_col_names, + std::unordered_map* col_name_to_block_idx, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, @@ -80,8 +82,9 @@ class HudiOrcReader final : public HudiReader { _params, _range.table_format_params.hudi_params.schema_id, tuple_descriptor, orc_type_ptr)); - return orc_reader->init_reader(&read_table_col_names, conjuncts, false, tuple_descriptor, - row_descriptor, not_single_slot_filter_conjuncts, + return orc_reader->init_reader(&read_table_col_names, col_name_to_block_idx, conjuncts, + false, tuple_descriptor, row_descriptor, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, table_info_node_ptr); } }; diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index f8d0999ac1b236..7a6f0fcb0e028c 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -104,7 +104,7 @@ Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); if (_equality_delete_impl != nullptr) { - RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block)); + RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block, _col_name_to_block_idx)); *read_rows = block->rows(); } return _shrink_block_if_need(block); @@ -150,6 +150,7 @@ Status IcebergTableReader::_equality_delete_base( const std::vector& delete_files) { bool init_schema = false; std::vector equality_delete_col_names; + std::unordered_map delete_col_name_to_block_idx; std::vector equality_delete_col_types; std::unordered_map> partition_columns; @@ -170,15 +171,20 @@ Status IcebergTableReader::_equality_delete_base( &equality_delete_col_types)); _generate_equality_delete_block(&_equality_delete_block, equality_delete_col_names, equality_delete_col_types); + for (uint32_t idx = 0; idx < equality_delete_col_names.size(); ++idx) { + delete_col_name_to_block_idx[equality_delete_col_names[idx]] = idx; + } init_schema = true; } if (auto* parquet_reader = typeid_cast(delete_reader.get())) { RETURN_IF_ERROR(parquet_reader->init_reader( - equality_delete_col_names, {}, nullptr, nullptr, nullptr, nullptr, nullptr, - TableSchemaChangeHelper::ConstNode::get_instance(), false)); + equality_delete_col_names, &delete_col_name_to_block_idx, {}, nullptr, nullptr, + nullptr, nullptr, nullptr, TableSchemaChangeHelper::ConstNode::get_instance(), + false)); } else if (auto* orc_reader = typeid_cast(delete_reader.get())) { - RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, {}, false, {}, {}, - nullptr, nullptr)); + RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, + &delete_col_name_to_block_idx, {}, false, {}, + {}, nullptr, nullptr)); } else { return Status::InternalError("Unsupported format of delete file"); } @@ -236,23 +242,25 @@ Status IcebergTableReader::_expand_block_if_need(Block* block) { return Status::InternalError("Wrong expand column '{}'", col.name); } names.insert(col.name); + (*_col_name_to_block_idx)[col.name] = static_cast(block->columns()); block->insert(col); } return Status::OK(); } Status IcebergTableReader::_shrink_block_if_need(Block* block) { - // todo: maybe do not need to build name to index map every time - auto name_to_pos_map = block->get_name_to_pos_map(); std::set positions_to_erase; for (const std::string& expand_col : _expand_col_names) { - if (!name_to_pos_map.contains(expand_col)) { + if (!_col_name_to_block_idx->contains(expand_col)) { return Status::InternalError("Wrong erase column '{}', block: {}", expand_col, block->dump_names()); } - positions_to_erase.emplace(name_to_pos_map[expand_col]); + positions_to_erase.emplace((*_col_name_to_block_idx)[expand_col]); } block->erase(positions_to_erase); + for (const std::string& expand_col : _expand_col_names) { + _col_name_to_block_idx->erase(expand_col); + } return Status::OK(); } @@ -433,12 +441,15 @@ void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFil } Status IcebergParquetReader::init_reader( - const std::vector& file_col_names, const VExprContextSPtrs& conjuncts, - const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::vector& file_col_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { _file_format = Fileformat::PARQUET; + _col_name_to_block_idx = col_name_to_block_idx; auto* parquet_reader = static_cast(_file_format_reader.get()); const FieldDescriptor* field_desc = nullptr; RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&field_desc)); @@ -476,9 +487,9 @@ Status IcebergParquetReader::init_reader( } } return parquet_reader->init_reader( - _all_required_col_names, conjuncts, tuple_descriptor, row_descriptor, - colname_to_slot_id, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, - table_info_node_ptr, true, column_ids, filter_column_ids); + _all_required_col_names, _col_name_to_block_idx, conjuncts, tuple_descriptor, + row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts, + slot_id_to_filter_conjuncts, table_info_node_ptr, true, column_ids, filter_column_ids); } ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* field_desc, @@ -549,7 +560,9 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d READ_DELETE_FILE_BATCH_SIZE, &_state->timezone_obj(), _io_ctx, _state, _meta_cache); RETURN_IF_ERROR(parquet_delete_reader.init_reader( - delete_file_col_names, {}, nullptr, nullptr, nullptr, nullptr, nullptr, + delete_file_col_names, + const_cast*>(&DELETE_COL_NAME_TO_BLOCK_IDX), + {}, nullptr, nullptr, nullptr, nullptr, nullptr, TableSchemaChangeHelper::ConstNode::get_instance(), false)); std::unordered_map> @@ -589,12 +602,15 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d }; Status IcebergOrcReader::init_reader( - const std::vector& file_col_names, const VExprContextSPtrs& conjuncts, - const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::vector& file_col_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { _file_format = Fileformat::ORC; + _col_name_to_block_idx = col_name_to_block_idx; auto* orc_reader = static_cast(_file_format_reader.get()); const orc::Type* orc_type_ptr = nullptr; RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr)); @@ -629,10 +645,10 @@ Status IcebergOrcReader::init_reader( column_ids.insert(sub_type->getColumnId()); } } - return orc_reader->init_reader(&_all_required_col_names, conjuncts, false, tuple_descriptor, - row_descriptor, not_single_slot_filter_conjuncts, - slot_id_to_filter_conjuncts, table_info_node_ptr, column_ids, - filter_column_ids); + return orc_reader->init_reader(&_all_required_col_names, _col_name_to_block_idx, conjuncts, + false, tuple_descriptor, row_descriptor, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, + table_info_node_ptr, column_ids, filter_column_ids); } ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type, @@ -700,8 +716,10 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete OrcReader orc_delete_reader(_profile, _state, _params, *delete_range, READ_DELETE_FILE_BATCH_SIZE, _state->timezone(), _io_ctx, _meta_cache); - RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, {}, false, {}, {}, - nullptr, nullptr)); + RETURN_IF_ERROR(orc_delete_reader.init_reader( + &delete_file_col_names, + const_cast*>(&DELETE_COL_NAME_TO_BLOCK_IDX), + {}, false, {}, {}, nullptr, nullptr)); std::unordered_map> partition_columns; diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 097fc38dfd7524..56d04cbab82f34 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -133,12 +133,18 @@ class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHel std::vector _expand_columns; std::vector _all_required_col_names; + // Pointer to external column name to block index mapping (from FileScanner) + // Used to dynamically add expand columns for equality delete + std::unordered_map* _col_name_to_block_idx = nullptr; + Fileformat _file_format = Fileformat::NONE; const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; - const std::string ICEBERG_ROW_POS = "pos"; const std::string ICEBERG_FILE_PATH = "file_path"; - const std::vector delete_file_col_names {ICEBERG_ROW_POS, ICEBERG_FILE_PATH}; + const std::string ICEBERG_ROW_POS = "pos"; + const std::vector delete_file_col_names {ICEBERG_FILE_PATH, ICEBERG_ROW_POS}; + const std::unordered_map DELETE_COL_NAME_TO_BLOCK_IDX = { + {ICEBERG_FILE_PATH, 0}, {ICEBERG_ROW_POS, 1}}; const int ICEBERG_FILE_PATH_INDEX = 0; const int ICEBERG_FILE_POS_INDEX = 1; const int READ_DELETE_FILE_BATCH_SIZE = 102400; @@ -165,8 +171,10 @@ class IcebergParquetReader final : public IcebergTableReader { : IcebergTableReader(std::move(file_format_reader), profile, state, params, range, kv_cache, io_ctx, meta_cache) {} Status init_reader( - const std::vector& file_col_names, const VExprContextSPtrs& conjuncts, - const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::vector& file_col_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts); @@ -211,8 +219,10 @@ class IcebergOrcReader final : public IcebergTableReader { } Status init_reader( - const std::vector& file_col_names, const VExprContextSPtrs& conjuncts, - const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::vector& file_col_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts); diff --git a/be/src/vec/exec/format/table/paimon_reader.h b/be/src/vec/exec/format/table/paimon_reader.h index 19ef4761299dbc..d5e2ec5a35da42 100644 --- a/be/src/vec/exec/format/table/paimon_reader.h +++ b/be/src/vec/exec/format/table/paimon_reader.h @@ -66,6 +66,7 @@ class PaimonOrcReader final : public PaimonReader { Status init_reader( const std::vector& read_table_col_names, + std::unordered_map* col_name_to_block_idx, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, @@ -77,8 +78,9 @@ class PaimonOrcReader final : public PaimonReader { _params, _range.table_format_params.paimon_params.schema_id, tuple_descriptor, orc_type_ptr)); - return orc_reader->init_reader(&read_table_col_names, conjuncts, false, tuple_descriptor, - row_descriptor, not_single_slot_filter_conjuncts, + return orc_reader->init_reader(&read_table_col_names, col_name_to_block_idx, conjuncts, + false, tuple_descriptor, row_descriptor, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, table_info_node_ptr); } }; @@ -101,6 +103,7 @@ class PaimonParquetReader final : public PaimonReader { Status init_reader( const std::vector& read_table_col_names, + std::unordered_map* col_name_to_block_idx, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, @@ -116,8 +119,8 @@ class PaimonParquetReader final : public PaimonReader { _params, _range.table_format_params.paimon_params.schema_id, tuple_descriptor, *field_desc)); - return parquet_reader->init_reader(read_table_col_names, conjuncts, tuple_descriptor, - row_descriptor, colname_to_slot_id, + return parquet_reader->init_reader(read_table_col_names, col_name_to_block_idx, conjuncts, + tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, table_info_node_ptr); } diff --git a/be/src/vec/exec/format/table/remote_doris_reader.cpp b/be/src/vec/exec/format/table/remote_doris_reader.cpp index ca56de0d48f6e5..b8c90f0a94a13e 100644 --- a/be/src/vec/exec/format/table/remote_doris_reader.cpp +++ b/be/src/vec/exec/format/table/remote_doris_reader.cpp @@ -76,20 +76,18 @@ Status RemoteDorisReader::get_next_block(Block* block, size_t* read_rows, bool* auto batch = chunk.data; auto num_rows = batch->num_rows(); auto num_columns = batch->num_columns(); - // todo: maybe do not need to build name to index map every time - auto name_to_pos_map = block->get_name_to_pos_map(); for (int c = 0; c < num_columns; ++c) { arrow::Array* column = batch->column(c).get(); std::string column_name = batch->schema()->field(c)->name(); - if (!name_to_pos_map.contains(column_name)) { + if (!_col_name_to_block_idx->contains(column_name)) { return Status::InternalError("column {} not found in block {}", column_name, block->dump_structure()); } try { const vectorized::ColumnWithTypeAndName& column_with_name = - block->get_by_position(name_to_pos_map[column_name]); + block->get_by_position((*_col_name_to_block_idx)[column_name]); RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow( column_with_name.column->assume_mutable_ref(), column, 0, num_rows, _ctzz)); } catch (Exception& e) { diff --git a/be/src/vec/exec/format/table/remote_doris_reader.h b/be/src/vec/exec/format/table/remote_doris_reader.h index 8884055d99a5c5..41888d2b24b05f 100644 --- a/be/src/vec/exec/format/table/remote_doris_reader.h +++ b/be/src/vec/exec/format/table/remote_doris_reader.h @@ -58,6 +58,14 @@ class RemoteDorisReader : public GenericReader { Status close() override; + /** + * Set column name to block index map from FileScanner to avoid repeated map creation. + */ + void set_col_name_to_block_idx( + std::unordered_map* col_name_to_block_idx) { + _col_name_to_block_idx = col_name_to_block_idx; + } + private: arrow::Status init_stream(); const TFileRangeDesc& _range; @@ -65,6 +73,8 @@ class RemoteDorisReader : public GenericReader { cctz::time_zone _ctzz; std::unique_ptr _flight_client; std::unique_ptr _stream; + // Column name to block index map, passed from FileScanner to avoid repeated map creation + std::unordered_map* _col_name_to_block_idx = nullptr; }; #include "common/compile_check_end.h" } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/transactional_hive_common.cpp b/be/src/vec/exec/format/table/transactional_hive_common.cpp index 7311418b06a4e5..2d8dcb09f1b780 100644 --- a/be/src/vec/exec/format/table/transactional_hive_common.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_common.cpp @@ -17,6 +17,8 @@ #include "transactional_hive_common.h" +#include + namespace doris::vectorized { #include "common/compile_check_begin.h" @@ -70,5 +72,10 @@ const std::vector TransactionalHive::ACID_COLUMN_NAMES_LOWER_CASE = OPERATION_LOWER_CASE, ORIGINAL_TRANSACTION_LOWER_CASE, BUCKET_LOWER_CASE, ROW_ID_LOWER_CASE, CURRENT_TRANSACTION_LOWER_CASE, ROW_LOWER_CASE}; +const std::unordered_map TransactionalHive::DELETE_COL_NAME_TO_BLOCK_IDX = { + {DELETE_ROW_PARAMS[0].column_lower_case, 0}, + {DELETE_ROW_PARAMS[1].column_lower_case, 1}, + {DELETE_ROW_PARAMS[2].column_lower_case, 2}}; + #include "common/compile_check_end.h" } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/transactional_hive_common.h b/be/src/vec/exec/format/table/transactional_hive_common.h index 25943e796708a2..a02d3f86b7f127 100644 --- a/be/src/vec/exec/format/table/transactional_hive_common.h +++ b/be/src/vec/exec/format/table/transactional_hive_common.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include "runtime/define_primitive_type.h" @@ -51,6 +52,8 @@ struct TransactionalHive { static const std::vector READ_ROW_COLUMN_NAMES_LOWER_CASE; static const std::vector ACID_COLUMN_NAMES; static const std::vector ACID_COLUMN_NAMES_LOWER_CASE; + + static const std::unordered_map DELETE_COL_NAME_TO_BLOCK_IDX; }; #include "common/compile_check_end.h" } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index bf0c8381c59470..82437fdf9290be 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -55,10 +55,13 @@ TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr } Status TransactionalHiveReader::init_reader( - const std::vector& column_names, const VExprContextSPtrs& conjuncts, - const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::vector& column_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { + _col_name_to_block_idx = col_name_to_block_idx; auto* orc_reader = static_cast(_file_format_reader.get()); _col_names.insert(_col_names.end(), column_names.begin(), column_names.end()); _col_names.insert(_col_names.end(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(), @@ -108,9 +111,9 @@ Status TransactionalHiveReader::init_reader( } } - Status status = orc_reader->init_reader(&_col_names, conjuncts, true, tuple_descriptor, - row_descriptor, not_single_slot_filter_conjuncts, - slot_id_to_filter_conjuncts, table_info_node_ptr); + Status status = orc_reader->init_reader( + &_col_names, col_name_to_block_idx, conjuncts, true, tuple_descriptor, row_descriptor, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, table_info_node_ptr); return status; } @@ -119,11 +122,15 @@ Status TransactionalHiveReader::get_next_block_inner(Block* block, size_t* read_ DataTypePtr data_type = get_data_type_with_default_argument( DataTypeFactory::instance().create_data_type(i.type, false)); MutableColumnPtr data_column = data_type->create_column(); + (*_col_name_to_block_idx)[i.column_lower_case] = static_cast(block->columns()); block->insert( ColumnWithTypeAndName(std::move(data_column), data_type, i.column_lower_case)); } auto res = _file_format_reader->get_next_block(block, read_rows, eof); Block::erase_useless_column(block, block->columns() - TransactionalHive::READ_PARAMS.size()); + for (const auto& i : TransactionalHive::READ_PARAMS) { + _col_name_to_block_idx->erase(i.column_lower_case); + } return res; } @@ -200,8 +207,10 @@ Status TransactionalHiveReader::init_row_filters() { } RETURN_IF_ERROR(delete_reader.init_reader( - &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, {}, false, nullptr, nullptr, - nullptr, nullptr, acid_info_node)); + &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, + const_cast*>( + &TransactionalHive::DELETE_COL_NAME_TO_BLOCK_IDX), + {}, false, nullptr, nullptr, nullptr, nullptr, acid_info_node)); std::unordered_map> partition_columns; diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h b/be/src/vec/exec/format/table/transactional_hive_reader.h index 59702e6e26c33c..29d1b1ec334e68 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.h +++ b/be/src/vec/exec/format/table/transactional_hive_reader.h @@ -90,8 +90,10 @@ class TransactionalHiveReader : public TableFormatReader, public TableSchemaChan Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; Status init_reader( - const std::vector& column_names, const VExprContextSPtrs& conjuncts, - const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::vector& column_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts); @@ -106,6 +108,8 @@ class TransactionalHiveReader : public TableFormatReader, public TableSchemaChan AcidRowIDSet _delete_rows; std::unique_ptr _delete_rows_filter_ptr; std::vector _col_names; + // Column name to block index map, passed from FileScanner + std::unordered_map* _col_name_to_block_idx = nullptr; }; inline bool operator<(const TransactionalHiveReader::AcidRowID& lhs, diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 166db3050d1c31..f624f18672ca62 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -313,10 +313,9 @@ Status JniConnector::_fill_block(Block* block, size_t num_rows) { SCOPED_RAW_TIMER(&_fill_block_watcher); JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - // todo: maybe do not need to build name to index map every time - auto name_to_pos_map = block->get_name_to_pos_map(); for (int i = 0; i < _column_names.size(); ++i) { - auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[_column_names[i]]); + auto& column_with_type_and_name = + block->get_by_position(_col_name_to_block_idx->at(_column_names[i])); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; RETURN_IF_ERROR(_fill_column(_table_meta, column_ptr, column_type, num_rows)); diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h index 25f5eacd9e5008..12ba6f7b8f64d6 100644 --- a/be/src/vec/exec/jni_connector.h +++ b/be/src/vec/exec/jni_connector.h @@ -256,6 +256,14 @@ class JniConnector : public ProfileCollector { */ Status close(); + /** + * Set column name to block index map from FileScanner to avoid repeated map creation. + */ + void set_col_name_to_block_idx( + const std::unordered_map* col_name_to_block_idx) { + _col_name_to_block_idx = col_name_to_block_idx; + } + static std::string get_jni_type(const DataTypePtr& data_type); static std::string get_jni_type_with_different_string(const DataTypePtr& data_type); @@ -318,6 +326,9 @@ class JniConnector : public ProfileCollector { int _predicates_length = 0; std::unique_ptr _predicates; + // Column name to block index map, passed from FileScanner to avoid repeated map creation + const std::unordered_map* _col_name_to_block_idx = nullptr; + /** * Set the address of meta information, which is returned by org.apache.doris.common.jni.JniScanner#getNextBatchMeta */ diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index 9373bcfd056434..e6b3f294284def 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -537,8 +537,10 @@ Status FileScanner::_init_src_block(Block* block) { if (!_is_load) { _src_block_ptr = block; - // todo: maybe do not need to build name to index map every time - _src_block_name_to_idx = block->get_name_to_pos_map(); + // Build name to index map only once on first call + if (_src_block_name_to_idx.empty()) { + _src_block_name_to_idx = block->get_name_to_pos_map(); + } return Status::OK(); } RETURN_IF_ERROR(_check_output_block_types()); @@ -1026,6 +1028,11 @@ Status FileScanner::_get_next_reader() { _profile, range); init_status = ((TrinoConnectorJniReader*)(_cur_reader.get()))->init_reader(); } + // Set col_name_to_block_idx for JNI readers to avoid repeated map creation + if (_cur_reader) { + static_cast(_cur_reader.get()) + ->set_col_name_to_block_idx(&_src_block_name_to_idx); + } break; } case TFileFormatType::FORMAT_PARQUET: { @@ -1110,6 +1117,11 @@ Status FileScanner::_get_next_reader() { _cur_reader = AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs, range); init_status = ((AvroJNIReader*)(_cur_reader.get()))->init_reader(); + // Set col_name_to_block_idx for JNI readers to avoid repeated map creation + if (_cur_reader) { + static_cast(_cur_reader.get()) + ->set_col_name_to_block_idx(&_src_block_name_to_idx); + } break; } case TFileFormatType::FORMAT_WAL: { @@ -1123,6 +1135,10 @@ Status FileScanner::_get_next_reader() { _cur_reader = RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range); init_status = ((RemoteDorisReader*)(_cur_reader.get()))->init_reader(); + if (_cur_reader) { + static_cast(_cur_reader.get()) + ->set_col_name_to_block_idx(&_src_block_name_to_idx); + } } else { _cur_reader = ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params, @@ -1196,7 +1212,7 @@ Status FileScanner::_init_parquet_reader(std::unique_ptr&& parque std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(), file_meta_cache_ptr); init_status = iceberg_reader->init_reader( - _file_col_names, _push_down_conjuncts, _real_tuple_desc, + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); _cur_reader = std::move(iceberg_reader); @@ -1206,7 +1222,7 @@ Status FileScanner::_init_parquet_reader(std::unique_ptr&& parque std::move(parquet_reader), _profile, _state, *_params, range, _io_ctx.get(), file_meta_cache_ptr); init_status = paimon_reader->init_reader( - _file_col_names, _push_down_conjuncts, _real_tuple_desc, + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); RETURN_IF_ERROR(paimon_reader->init_row_filters()); @@ -1217,7 +1233,7 @@ Status FileScanner::_init_parquet_reader(std::unique_ptr&& parque std::move(parquet_reader), _profile, _state, *_params, range, _io_ctx.get(), file_meta_cache_ptr); init_status = hudi_reader->init_reader( - _file_col_names, _push_down_conjuncts, _real_tuple_desc, + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); _cur_reader = std::move(hudi_reader); @@ -1226,7 +1242,7 @@ Status FileScanner::_init_parquet_reader(std::unique_ptr&& parque _state, *_params, range, _io_ctx.get(), &_is_file_slot, file_meta_cache_ptr); init_status = hive_reader->init_reader( - _file_col_names, _push_down_conjuncts, _real_tuple_desc, + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); _cur_reader = std::move(hive_reader); @@ -1242,7 +1258,7 @@ Status FileScanner::_init_parquet_reader(std::unique_ptr&& parque RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_name( _real_tuple_desc, *parquet_meta, tvf_info_node)); init_status = parquet_reader->init_reader( - _file_col_names, _push_down_conjuncts, _real_tuple_desc, + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts, tvf_info_node); _cur_reader = std::move(parquet_reader); @@ -1272,7 +1288,7 @@ Status FileScanner::_init_parquet_reader(std::unique_ptr&& parque } init_status = parquet_reader->init_reader( - _file_col_names, _push_down_conjuncts, _real_tuple_desc, + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts, load_info_node); _cur_reader = std::move(parquet_reader); @@ -1292,10 +1308,10 @@ Status FileScanner::_init_orc_reader(std::unique_ptr&& orc_reader, TransactionalHiveReader::create_unique(std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get(), file_meta_cache_ptr); - init_status = tran_orc_reader->init_reader(_file_col_names, _push_down_conjuncts, - _real_tuple_desc, _default_val_row_desc.get(), - &_not_single_slot_filter_conjuncts, - &_slot_id_to_filter_conjuncts); + init_status = tran_orc_reader->init_reader( + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, + _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts); RETURN_IF_ERROR(tran_orc_reader->init_row_filters()); _cur_reader = std::move(tran_orc_reader); } else if (range.__isset.table_format_params && @@ -1305,7 +1321,7 @@ Status FileScanner::_init_orc_reader(std::unique_ptr&& orc_reader, file_meta_cache_ptr); init_status = iceberg_reader->init_reader( - _file_col_names, _push_down_conjuncts, _real_tuple_desc, + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); _cur_reader = std::move(iceberg_reader); @@ -1315,10 +1331,10 @@ Status FileScanner::_init_orc_reader(std::unique_ptr&& orc_reader, PaimonOrcReader::create_unique(std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get(), file_meta_cache_ptr); - init_status = paimon_reader->init_reader(_file_col_names, _push_down_conjuncts, - _real_tuple_desc, _default_val_row_desc.get(), - &_not_single_slot_filter_conjuncts, - &_slot_id_to_filter_conjuncts); + init_status = paimon_reader->init_reader( + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, + _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts); RETURN_IF_ERROR(paimon_reader->init_row_filters()); _cur_reader = std::move(paimon_reader); } else if (range.__isset.table_format_params && @@ -1327,10 +1343,10 @@ Status FileScanner::_init_orc_reader(std::unique_ptr&& orc_reader, HudiOrcReader::create_unique(std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get(), file_meta_cache_ptr); - init_status = hudi_reader->init_reader(_file_col_names, _push_down_conjuncts, - _real_tuple_desc, _default_val_row_desc.get(), - &_not_single_slot_filter_conjuncts, - &_slot_id_to_filter_conjuncts); + init_status = hudi_reader->init_reader( + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, + _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts); _cur_reader = std::move(hudi_reader); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "hive") { @@ -1338,10 +1354,10 @@ Status FileScanner::_init_orc_reader(std::unique_ptr&& orc_reader, std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get(), &_is_file_slot, file_meta_cache_ptr); - init_status = hive_reader->init_reader(_file_col_names, _push_down_conjuncts, - _real_tuple_desc, _default_val_row_desc.get(), - &_not_single_slot_filter_conjuncts, - &_slot_id_to_filter_conjuncts); + init_status = hive_reader->init_reader( + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, + _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts); _cur_reader = std::move(hive_reader); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "tvf") { @@ -1351,10 +1367,10 @@ Status FileScanner::_init_orc_reader(std::unique_ptr&& orc_reader, std::shared_ptr tvf_info_node = nullptr; RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_name( _real_tuple_desc, orc_type_ptr, tvf_info_node)); - init_status = orc_reader->init_reader(&_file_col_names, _push_down_conjuncts, false, - _real_tuple_desc, _default_val_row_desc.get(), - &_not_single_slot_filter_conjuncts, - &_slot_id_to_filter_conjuncts, tvf_info_node); + init_status = orc_reader->init_reader( + &_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, false, + _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts, tvf_info_node); _cur_reader = std::move(orc_reader); } else if (_is_load) { const orc::Type* orc_type_ptr = nullptr; @@ -1376,10 +1392,10 @@ Status FileScanner::_init_orc_reader(std::unique_ptr&& orc_reader, load_info_node->add_not_exist_children(slot->col_name()); } } - init_status = orc_reader->init_reader(&_file_col_names, _push_down_conjuncts, false, - _real_tuple_desc, _default_val_row_desc.get(), - &_not_single_slot_filter_conjuncts, - &_slot_id_to_filter_conjuncts, load_info_node); + init_status = orc_reader->init_reader( + &_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, false, + _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts, load_info_node); _cur_reader = std::move(orc_reader); } diff --git a/be/src/vec/exec/scan/meta_scanner.cpp b/be/src/vec/exec/scan/meta_scanner.cpp index f4aafa47220a57..10dadbe39eaf77 100644 --- a/be/src/vec/exec/scan/meta_scanner.cpp +++ b/be/src/vec/exec/scan/meta_scanner.cpp @@ -72,11 +72,15 @@ Status MetaScanner::open(RuntimeState* state) { auto reader = IcebergSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile, _scan_range.meta_scan_range); RETURN_IF_ERROR(reader->init_reader()); + static_cast(reader.get()) + ->set_col_name_to_block_idx(&_src_block_name_to_idx); _reader = std::move(reader); } else if (_scan_range.meta_scan_range.metadata_type == TMetadataType::PAIMON) { auto reader = PaimonSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile, _scan_range.meta_scan_range); RETURN_IF_ERROR(reader->init_reader()); + static_cast(reader.get()) + ->set_col_name_to_block_idx(&_src_block_name_to_idx); _reader = std::move(reader); } else { RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); @@ -96,6 +100,12 @@ Status MetaScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof if (nullptr == state || nullptr == block || nullptr == eof) { return Status::InternalError("input is NULL pointer"); } + + // Build name to index map only once on first call + if (_src_block_name_to_idx.empty()) { + _src_block_name_to_idx = block->get_name_to_pos_map(); + } + if (_reader) { // TODO: This is a temporary workaround; the code is planned to be refactored later. size_t read_rows = 0; diff --git a/be/src/vec/exec/scan/meta_scanner.h b/be/src/vec/exec/scan/meta_scanner.h index ac36b01e5000d9..8d62ef95bd2eb6 100644 --- a/be/src/vec/exec/scan/meta_scanner.h +++ b/be/src/vec/exec/scan/meta_scanner.h @@ -97,6 +97,8 @@ class MetaScanner : public Scanner { // for reading metadata using reader from be std::unique_ptr _reader; + + std::unordered_map _src_block_name_to_idx; }; #include "common/compile_check_end.h" } // namespace doris::vectorized diff --git a/be/test/vec/exec/format/parquet/parquet_expr_test.cpp b/be/test/vec/exec/format/parquet/parquet_expr_test.cpp index b5291a52c7093f..07a1d1af4fbf29 100644 --- a/be/test/vec/exec/format/parquet/parquet_expr_test.cpp +++ b/be/test/vec/exec/format/parquet/parquet_expr_test.cpp @@ -265,8 +265,10 @@ class ParquetExprTest : public testing::Test { TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); // auto tuple_desc = desc_tbl->get_tuple_descriptor(0); std::vector column_names; + std::unordered_map col_name_to_block_idx; for (int i = 0; i < slot_descs.size(); i++) { column_names.push_back(slot_descs[i]->col_name()); + col_name_to_block_idx[slot_descs[i]->col_name()] = i; } TFileScanRangeParams scan_params; TFileRangeDesc scan_range; @@ -279,8 +281,9 @@ class ParquetExprTest : public testing::Test { &ctz, nullptr, nullptr); p_reader->set_file_reader(local_file_reader); colname_to_slot_id.emplace("int64_col", 2); - static_cast(p_reader->init_reader(column_names, {}, tuple_desc, nullptr, - &colname_to_slot_id, nullptr, nullptr)); + static_cast(p_reader->init_reader(column_names, &col_name_to_block_idx, {}, + tuple_desc, nullptr, &colname_to_slot_id, nullptr, + nullptr)); size_t meta_size; static_cast(parse_thrift_footer(p_reader->_file_reader, &doris_file_metadata, diff --git a/be/test/vec/exec/format/parquet/parquet_read_lines.cpp b/be/test/vec/exec/format/parquet/parquet_read_lines.cpp index 280df9bced14c8..60aebd16dd4d79 100644 --- a/be/test/vec/exec/format/parquet/parquet_read_lines.cpp +++ b/be/test/vec/exec/format/parquet/parquet_read_lines.cpp @@ -124,9 +124,11 @@ static void read_parquet_lines(std::vector numeric_types, TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); auto tuple_desc = desc_tbl->get_tuple_descriptor(0); std::vector column_names; + std::unordered_map col_name_to_block_idx; std::vector missing_column_names; for (int i = 0; i < slot_descs.size(); i++) { column_names.push_back(slot_descs[i]->col_name()); + col_name_to_block_idx[slot_descs[i]->col_name()] = i; } TFileScanRangeParams scan_params; TFileRangeDesc scan_range; @@ -149,8 +151,8 @@ static void read_parquet_lines(std::vector numeric_types, runtime_state.set_desc_tbl(desc_tbl); std::unordered_map colname_to_value_range; - static_cast( - p_reader->init_reader(column_names, {}, nullptr, nullptr, nullptr, nullptr, nullptr)); + static_cast(p_reader->init_reader(column_names, &col_name_to_block_idx, {}, nullptr, + nullptr, nullptr, nullptr, nullptr)); std::unordered_map> partition_columns; std::unordered_map missing_columns; diff --git a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp index 1db0ea97b9fc57..7d49decbd3f7fb 100644 --- a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp @@ -134,8 +134,10 @@ TEST_F(ParquetReaderTest, normal) { TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); auto tuple_desc = desc_tbl->get_tuple_descriptor(0); std::vector column_names; + std::unordered_map col_name_to_block_idx; for (int i = 0; i < slot_descs.size(); i++) { column_names.push_back(slot_descs[i]->col_name()); + col_name_to_block_idx[slot_descs[i]->col_name()] = i; } TFileScanRangeParams scan_params; TFileRangeDesc scan_range; @@ -149,8 +151,8 @@ TEST_F(ParquetReaderTest, normal) { RuntimeState runtime_state((TQueryGlobals())); runtime_state.set_desc_tbl(desc_tbl); - static_cast( - p_reader->init_reader(column_names, {}, nullptr, nullptr, nullptr, nullptr, nullptr)); + static_cast(p_reader->init_reader(column_names, &col_name_to_block_idx, {}, nullptr, + nullptr, nullptr, nullptr, nullptr)); std::unordered_map> partition_columns; std::unordered_map missing_columns; @@ -195,8 +197,10 @@ TEST_F(ParquetReaderTest, uuid_varbinary) { TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); auto tuple_desc = desc_tbl->get_tuple_descriptor(0); std::vector column_names; + std::unordered_map col_name_to_block_idx; for (int i = 0; i < slot_descs.size(); i++) { column_names.push_back(slot_descs[i]->col_name()); + col_name_to_block_idx[slot_descs[i]->col_name()] = i; } TFileScanRangeParams scan_params; scan_params.enable_mapping_varbinary = true; @@ -211,7 +215,8 @@ TEST_F(ParquetReaderTest, uuid_varbinary) { RuntimeState runtime_state((TQueryGlobals())); runtime_state.set_desc_tbl(desc_tbl); - st = p_reader->init_reader(column_names, {}, nullptr, nullptr, nullptr, nullptr, nullptr); + st = p_reader->init_reader(column_names, &col_name_to_block_idx, {}, nullptr, nullptr, nullptr, + nullptr, nullptr); EXPECT_TRUE(st.ok()) << st; std::unordered_map> partition_columns; @@ -265,8 +270,10 @@ TEST_F(ParquetReaderTest, varbinary_varbinary) { TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); auto tuple_desc = desc_tbl->get_tuple_descriptor(0); std::vector column_names; + std::unordered_map col_name_to_block_idx; for (int i = 0; i < slot_descs.size(); i++) { column_names.push_back(slot_descs[i]->col_name()); + col_name_to_block_idx[slot_descs[i]->col_name()] = i; } TFileScanRangeParams scan_params; scan_params.enable_mapping_varbinary = true; @@ -281,7 +288,8 @@ TEST_F(ParquetReaderTest, varbinary_varbinary) { RuntimeState runtime_state((TQueryGlobals())); runtime_state.set_desc_tbl(desc_tbl); - st = p_reader->init_reader(column_names, {}, nullptr, nullptr, nullptr, nullptr, nullptr); + st = p_reader->init_reader(column_names, &col_name_to_block_idx, {}, nullptr, nullptr, nullptr, + nullptr, nullptr); EXPECT_TRUE(st.ok()) << st; std::unordered_map> partition_columns; @@ -335,8 +343,10 @@ TEST_F(ParquetReaderTest, varbinary_string) { TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); auto tuple_desc = desc_tbl->get_tuple_descriptor(0); std::vector column_names; + std::unordered_map col_name_to_block_idx; for (int i = 0; i < slot_descs.size(); i++) { column_names.push_back(slot_descs[i]->col_name()); + col_name_to_block_idx[slot_descs[i]->col_name()] = i; } TFileScanRangeParams scan_params; // use string to read parquet column, but dst type is varbinary @@ -353,7 +363,8 @@ TEST_F(ParquetReaderTest, varbinary_string) { RuntimeState runtime_state((TQueryGlobals())); runtime_state.set_desc_tbl(desc_tbl); - st = p_reader->init_reader(column_names, {}, nullptr, nullptr, nullptr, nullptr, nullptr); + st = p_reader->init_reader(column_names, &col_name_to_block_idx, {}, nullptr, nullptr, nullptr, + nullptr, nullptr); EXPECT_TRUE(st.ok()) << st; std::unordered_map> partition_columns; @@ -407,8 +418,10 @@ TEST_F(ParquetReaderTest, varbinary_string2) { TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); auto tuple_desc = desc_tbl->get_tuple_descriptor(0); std::vector column_names; + std::unordered_map col_name_to_block_idx; for (int i = 0; i < slot_descs.size(); i++) { column_names.push_back(slot_descs[i]->col_name()); + col_name_to_block_idx[slot_descs[i]->col_name()] = i; } TFileScanRangeParams scan_params; // although want use binary column read, _cached_src_physical_type is string, so use string to read parquet column, but dst type is string @@ -425,7 +438,8 @@ TEST_F(ParquetReaderTest, varbinary_string2) { RuntimeState runtime_state((TQueryGlobals())); runtime_state.set_desc_tbl(desc_tbl); - st = p_reader->init_reader(column_names, {}, nullptr, nullptr, nullptr, nullptr, nullptr); + st = p_reader->init_reader(column_names, &col_name_to_block_idx, {}, nullptr, nullptr, nullptr, + nullptr, nullptr); EXPECT_TRUE(st.ok()) << st; std::unordered_map> partition_columns; diff --git a/be/test/vec/exec/format/table/hive/hive_reader_test.cpp b/be/test/vec/exec/format/table/hive/hive_reader_test.cpp index c831940602d687..d79f4ded7888fd 100644 --- a/be/test/vec/exec/format/table/hive/hive_reader_test.cpp +++ b/be/test/vec/exec/format/table/hive/hive_reader_test.cpp @@ -566,14 +566,15 @@ TEST_F(HiveReaderTest, read_hive_parquet_file) { VExprContextSPtrs conjuncts; // Empty conjuncts for this test std::vector table_col_names = {"name", "profile"}; + std::unordered_map col_name_to_block_idx = {{"name", 0}, {"profile", 1}}; const RowDescriptor* row_descriptor = nullptr; const std::unordered_map* colname_to_slot_id = nullptr; const VExprContextSPtrs* not_single_slot_filter_conjuncts = nullptr; const std::unordered_map* slot_id_to_filter_conjuncts = nullptr; - st = hive_reader->init_reader(table_col_names, conjuncts, tuple_descriptor, row_descriptor, - colname_to_slot_id, not_single_slot_filter_conjuncts, - slot_id_to_filter_conjuncts); + st = hive_reader->init_reader(table_col_names, &col_name_to_block_idx, conjuncts, + tuple_descriptor, row_descriptor, colname_to_slot_id, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); ASSERT_TRUE(st.ok()) << st; std::unordered_map> @@ -702,11 +703,13 @@ TEST_F(HiveReaderTest, read_hive_rrc_file) { VExprContextSPtrs conjuncts; // Empty conjuncts for this test std::vector table_col_names = {"name", "profile"}; + std::unordered_map col_name_to_block_idx = {{"name", 0}, {"profile", 1}}; const RowDescriptor* row_descriptor = nullptr; const VExprContextSPtrs* not_single_slot_filter_conjuncts = nullptr; const std::unordered_map* slot_id_to_filter_conjuncts = nullptr; - st = hive_reader->init_reader(table_col_names, conjuncts, tuple_descriptor, row_descriptor, + st = hive_reader->init_reader(table_col_names, &col_name_to_block_idx, conjuncts, + tuple_descriptor, row_descriptor, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); ASSERT_TRUE(st.ok()) << st; diff --git a/be/test/vec/exec/format/table/iceberg/iceberg_reader_test.cpp b/be/test/vec/exec/format/table/iceberg/iceberg_reader_test.cpp index 6b41eb75f0f4e8..f82e64bb03ff33 100644 --- a/be/test/vec/exec/format/table/iceberg/iceberg_reader_test.cpp +++ b/be/test/vec/exec/format/table/iceberg/iceberg_reader_test.cpp @@ -563,14 +563,18 @@ TEST_F(IcebergReaderTest, read_iceberg_parquet_file) { VExprContextSPtrs conjuncts; // Empty conjuncts for this test std::vector table_col_names = {"name", "profile"}; + std::unordered_map col_name_to_block_idx = { + {"name", 0}, + {"profile", 1}, + }; const RowDescriptor* row_descriptor = nullptr; const std::unordered_map* colname_to_slot_id = nullptr; const VExprContextSPtrs* not_single_slot_filter_conjuncts = nullptr; const std::unordered_map* slot_id_to_filter_conjuncts = nullptr; - st = iceberg_reader->init_reader(table_col_names, conjuncts, tuple_descriptor, row_descriptor, - colname_to_slot_id, not_single_slot_filter_conjuncts, - slot_id_to_filter_conjuncts); + st = iceberg_reader->init_reader(table_col_names, &col_name_to_block_idx, conjuncts, + tuple_descriptor, row_descriptor, colname_to_slot_id, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); ASSERT_TRUE(st.ok()) << st; std::unordered_map> @@ -696,12 +700,16 @@ TEST_F(IcebergReaderTest, read_iceberg_orc_file) { std::vector table_col_names = {"name", "profile"}; const RowDescriptor* row_descriptor = nullptr; const std::unordered_map* colname_to_slot_id = nullptr; + std::unordered_map col_name_to_block_idx = { + {"name", 0}, + {"profile", 1}, + }; const VExprContextSPtrs* not_single_slot_filter_conjuncts = nullptr; const std::unordered_map* slot_id_to_filter_conjuncts = nullptr; - st = iceberg_reader->init_reader(table_col_names, conjuncts, tuple_descriptor, row_descriptor, - colname_to_slot_id, not_single_slot_filter_conjuncts, - slot_id_to_filter_conjuncts); + st = iceberg_reader->init_reader(table_col_names, &col_name_to_block_idx, conjuncts, + tuple_descriptor, row_descriptor, colname_to_slot_id, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); ASSERT_TRUE(st.ok()) << st; std::unordered_map> diff --git a/be/test/vec/exec/orc/orc_read_lines.cpp b/be/test/vec/exec/orc/orc_read_lines.cpp index bfb17c4d2872bd..d22bd91543e8b5 100644 --- a/be/test/vec/exec/orc/orc_read_lines.cpp +++ b/be/test/vec/exec/orc/orc_read_lines.cpp @@ -64,6 +64,10 @@ static void read_orc_line(int64_t line, std::string block_dump) { std::vector column_names = {"col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9"}; + std::unordered_map col_name_to_block_idx = { + {"col1", 0}, {"col2", 1}, {"col3", 2}, {"col4", 3}, {"col5", 4}, + {"col6", 5}, {"col7", 6}, {"col8", 7}, {"col9", 8}, + }; ObjectPool object_pool; DescriptorTblBuilder builder(&object_pool); builder.declare_tuple() << std::make_tuple( @@ -133,8 +137,8 @@ static void read_orc_line(int64_t line, std::string block_dump) { tuple_desc->slots().size()); reader->set_row_id_column_iterator(iterator_pair); - auto status = - reader->init_reader(&column_names, {}, false, tuple_desc, &row_desc, nullptr, nullptr); + auto status = reader->init_reader(&column_names, &col_name_to_block_idx, {}, false, tuple_desc, + &row_desc, nullptr, nullptr); EXPECT_TRUE(status.ok()); diff --git a/be/test/vec/exec/orc_reader_test.cpp b/be/test/vec/exec/orc_reader_test.cpp index 6bb3ce44f40754..b98c61c3c17965 100644 --- a/be/test/vec/exec/orc_reader_test.cpp +++ b/be/test/vec/exec/orc_reader_test.cpp @@ -49,6 +49,11 @@ class OrcReaderTest : public testing::Test { std::vector column_names = { "o_orderkey", "o_custkey", "o_orderstatus", "o_totalprice", "o_orderdate", "o_orderpriority", "o_clerk", "o_shippriority", "o_comment"}; + std::unordered_map col_name_to_block_idx = { + {"o_orderkey", 0}, {"o_custkey", 1}, {"o_orderstatus", 2}, + {"o_totalprice", 3}, {"o_orderdate", 4}, {"o_orderpriority", 5}, + {"o_clerk", 6}, {"o_shippriority", 7}, {"o_comment", 8}, + }; ObjectPool object_pool; DescriptorTblBuilder builder(&object_pool); builder.declare_tuple() @@ -79,8 +84,8 @@ class OrcReaderTest : public testing::Test { range.start_offset = 0; range.size = 1293; auto reader = OrcReader::create_unique(params, range, "", nullptr, &cache, true); - auto status = reader->init_reader(&column_names, {}, false, tuple_desc, &row_desc, nullptr, - nullptr); + auto status = reader->init_reader(&column_names, &col_name_to_block_idx, {}, false, + tuple_desc, &row_desc, nullptr, nullptr); EXPECT_TRUE(status.ok()); // deserialize expr