Skip to content

Commit 056fbe4

Browse files
committed
[Opt](multi-catalog) Opt by avoid building name_to_index map every time. (#58679)
Related PR: #58124
1 parent 5ed90c9 commit 056fbe4

36 files changed

+373
-205
lines changed

be/src/olap/push_handler.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,8 +407,10 @@ Status PushBrokerReader::init() {
407407
_io_ctx->query_id = &_runtime_state->query_id();
408408

409409
auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
410+
uint32_t idx = 0;
410411
for (auto& slot_desc : slot_descs) {
411412
_all_col_names.push_back(to_lower((slot_desc->col_name())));
413+
_col_name_to_block_idx.insert({to_lower(slot_desc->col_name()), idx++});
412414
}
413415

414416
RETURN_IF_ERROR(_init_expr_ctxes());
@@ -656,8 +658,8 @@ Status PushBrokerReader::_get_next_reader() {
656658
_io_ctx.get(), _runtime_state.get());
657659

658660
init_status = parquet_reader->init_reader(
659-
_all_col_names, _colname_to_value_range, _push_down_exprs, _real_tuple_desc,
660-
_default_val_row_desc.get(), _col_name_to_slot_id,
661+
_all_col_names, &_col_name_to_block_idx, _colname_to_value_range, _push_down_exprs,
662+
_real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id,
661663
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,
662664
vectorized::TableSchemaChangeHelper::ConstNode::get_instance(), false);
663665
_cur_reader = std::move(parquet_reader);

be/src/olap/push_handler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ class PushBrokerReader {
141141

142142
// col names from _slot_descs
143143
std::vector<std::string> _all_col_names;
144+
std::unordered_map<std::string, uint32_t> _col_name_to_block_idx;
144145
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
145146
vectorized::VExprContextSPtrs _push_down_exprs;
146147
const std::unordered_map<std::string, int>* _col_name_to_slot_id;

be/src/vec/exec/format/jni_reader.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ class JniReader : public GenericReader {
6868
return Status::OK();
6969
}
7070

71+
void set_col_name_to_block_idx(
72+
const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx) {
73+
if (_jni_connector) {
74+
_jni_connector->set_col_name_to_block_idx(col_name_to_block_idx);
75+
}
76+
}
77+
7178
protected:
7279
void _collect_profile_before_close() override {
7380
if (_jni_connector) {

be/src/vec/exec/format/orc/vorc_reader.cpp

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ Status OrcReader::_create_file_reader() {
357357

358358
Status OrcReader::init_reader(
359359
const std::vector<std::string>* column_names,
360+
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
360361
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
361362
const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor,
362363
const RowDescriptor* row_descriptor,
@@ -365,6 +366,7 @@ Status OrcReader::init_reader(
365366
std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node_ptr,
366367
const std::set<uint64_t>& column_ids, const std::set<uint64_t>& filter_column_ids) {
367368
_table_column_names = column_names;
369+
_col_name_to_block_idx = col_name_to_block_idx;
368370
_colname_to_value_range = colname_to_value_range;
369371
_lazy_read_ctx.conjuncts = conjuncts;
370372
_is_acid = is_acid;
@@ -1331,10 +1333,9 @@ Status OrcReader::_fill_partition_columns(
13311333
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
13321334
partition_columns) {
13331335
DataTypeSerDe::FormatOptions _text_formatOptions;
1334-
// todo: maybe do not need to build name to index map every time
1335-
auto name_to_pos_map = block->get_name_to_pos_map();
13361336
for (const auto& kv : partition_columns) {
1337-
auto col_ptr = block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
1337+
auto col_ptr = block->get_by_position((*_col_name_to_block_idx)[kv.first])
1338+
.column->assume_mutable();
13381339
const auto& [value, slot_desc] = kv.second;
13391340
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
13401341
Slice slice(value.data(), value.size());
@@ -1360,18 +1361,16 @@ Status OrcReader::_fill_partition_columns(
13601361
Status OrcReader::_fill_missing_columns(
13611362
Block* block, uint64_t rows,
13621363
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
1363-
// todo: maybe do not need to build name to index map every time
1364-
auto name_to_pos_map = block->get_name_to_pos_map();
13651364
std::set<size_t> positions_to_erase;
13661365
for (const auto& kv : missing_columns) {
1367-
if (!name_to_pos_map.contains(kv.first)) {
1366+
if (!_col_name_to_block_idx->contains(kv.first)) {
13681367
return Status::InternalError("Failed to find missing column: {}, block: {}", kv.first,
13691368
block->dump_structure());
13701369
}
13711370
if (kv.second == nullptr) {
13721371
// no default column, fill with null
1373-
auto mutable_column =
1374-
block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
1372+
auto mutable_column = block->get_by_position((*_col_name_to_block_idx)[kv.first])
1373+
.column->assume_mutable();
13751374
auto* nullable_column = static_cast<vectorized::ColumnNullable*>(mutable_column.get());
13761375
nullable_column->insert_many_defaults(rows);
13771376
} else {
@@ -1391,10 +1390,11 @@ Status OrcReader::_fill_missing_columns(
13911390
mutable_column->resize(rows);
13921391
// result_column_ptr maybe a ColumnConst, convert it to a normal column
13931392
result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
1394-
auto origin_column_type = block->get_by_position(name_to_pos_map[kv.first]).type;
1393+
auto origin_column_type =
1394+
block->get_by_position((*_col_name_to_block_idx)[kv.first]).type;
13951395
bool is_nullable = origin_column_type->is_nullable();
13961396
block->replace_by_position(
1397-
name_to_pos_map[kv.first],
1397+
(*_col_name_to_block_idx)[kv.first],
13981398
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
13991399
positions_to_erase.insert(result_column_id);
14001400
}
@@ -2270,10 +2270,9 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
22702270
std::vector<orc::ColumnVectorBatch*> batch_vec;
22712271
_fill_batch_vec(batch_vec, _batch.get(), 0);
22722272

2273-
// todo: maybe do not need to build name to index map every time
2274-
auto name_to_pos_map = block->get_name_to_pos_map();
22752273
for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
2276-
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]);
2274+
auto& column_with_type_and_name =
2275+
block->get_by_position((*_col_name_to_block_idx)[col_name]);
22772276
auto& column_ptr = column_with_type_and_name.column;
22782277
auto& column_type = column_with_type_and_name.type;
22792278
auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
@@ -2339,17 +2338,15 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
23392338
}
23402339
}
23412340

2342-
// todo: maybe do not need to build name to index map every time
2343-
auto name_to_pos_map = block->get_name_to_pos_map();
23442341
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
23452342
for (auto& dict_filter_cols : _dict_filter_cols) {
23462343
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
2347-
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
2344+
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
23482345
return Status::InternalError(
23492346
"Failed to find dict filter column '{}' in block {}",
23502347
dict_filter_cols.first, block->dump_structure());
23512348
}
2352-
auto pos = name_to_pos_map[dict_filter_cols.first];
2349+
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
23532350
auto& column_with_type_and_name = block->get_by_position(pos);
23542351
auto& column_type = column_with_type_and_name.type;
23552352
if (column_type->is_nullable()) {
@@ -2371,7 +2368,8 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
23712368
_fill_batch_vec(batch_vec, _batch.get(), 0);
23722369

23732370
for (auto& col_name : _lazy_read_ctx.all_read_columns) {
2374-
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]);
2371+
auto& column_with_type_and_name =
2372+
block->get_by_position((*_col_name_to_block_idx)[col_name]);
23752373
auto& column_ptr = column_with_type_and_name.column;
23762374
auto& column_type = column_with_type_and_name.type;
23772375
auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
@@ -2482,17 +2480,17 @@ void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
24822480
if (_delete_rows != nullptr) {
24832481
_delete_rows_filter_ptr = std::make_unique<IColumn::Filter>(rows, 1);
24842482
auto* __restrict _pos_delete_filter_data = _delete_rows_filter_ptr->data();
2485-
// todo: maybe do not need to build name to index map every time
2486-
auto name_to_pos_map = block->get_name_to_pos_map();
24872483
const auto& original_transaction_column = assert_cast<const ColumnInt64&>(*remove_nullable(
2488-
block->get_by_position(
2489-
name_to_pos_map[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
2484+
block->get_by_position((*_col_name_to_block_idx)
2485+
[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
24902486
.column));
24912487
const auto& bucket_id_column = assert_cast<const ColumnInt32&>(*remove_nullable(
2492-
block->get_by_position(name_to_pos_map[TransactionalHive::BUCKET_LOWER_CASE])
2488+
block->get_by_position(
2489+
(*_col_name_to_block_idx)[TransactionalHive::BUCKET_LOWER_CASE])
24932490
.column));
24942491
const auto& row_id_column = assert_cast<const ColumnInt64&>(*remove_nullable(
2495-
block->get_by_position(name_to_pos_map[TransactionalHive::ROW_ID_LOWER_CASE])
2492+
block->get_by_position(
2493+
(*_col_name_to_block_idx)[TransactionalHive::ROW_ID_LOWER_CASE])
24962494
.column));
24972495
for (int i = 0; i < rows; ++i) {
24982496
auto original_transaction = original_transaction_column.get_int(i);
@@ -2516,15 +2514,13 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
25162514
size_t origin_column_num = block->columns();
25172515

25182516
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
2519-
// todo: maybe do not need to build name to index map every time
2520-
auto name_to_pos_map = block->get_name_to_pos_map();
25212517
for (auto& dict_filter_cols : _dict_filter_cols) {
2522-
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
2518+
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
25232519
return Status::InternalError("Failed to find dict filter column '{}' in block {}",
25242520
dict_filter_cols.first, block->dump_structure());
25252521
}
25262522
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
2527-
auto pos = name_to_pos_map[dict_filter_cols.first];
2523+
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
25282524
auto& column_with_type_and_name = block->get_by_position(pos);
25292525
auto& column_type = column_with_type_and_name.type;
25302526
if (column_type->is_nullable()) {
@@ -2550,10 +2546,9 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
25502546
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
25512547
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
25522548
}
2553-
// todo: maybe do not need to build name to index map every time
2554-
auto name_to_pos_map = block->get_name_to_pos_map();
25552549
for (auto& table_col_name : table_col_names) {
2556-
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[table_col_name]);
2550+
auto& column_with_type_and_name =
2551+
block->get_by_position((*_col_name_to_block_idx)[table_col_name]);
25572552
auto& column_ptr = column_with_type_and_name.column;
25582553
auto& column_type = column_with_type_and_name.type;
25592554
auto file_column_name = _table_info_node_ptr->children_file_column_name(table_col_name);
@@ -2605,13 +2600,19 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
26052600
if (can_filter_all) {
26062601
for (auto& col : table_col_names) {
26072602
// clean block to read predicate columns and acid columns
2608-
block->get_by_position(name_to_pos_map[col]).column->assume_mutable()->clear();
2603+
block->get_by_position((*_col_name_to_block_idx)[col])
2604+
.column->assume_mutable()
2605+
->clear();
26092606
}
26102607
for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
2611-
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
2608+
block->get_by_position((*_col_name_to_block_idx)[col.first])
2609+
.column->assume_mutable()
2610+
->clear();
26122611
}
26132612
for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
2614-
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
2613+
block->get_by_position((*_col_name_to_block_idx)[col.first])
2614+
.column->assume_mutable()
2615+
->clear();
26152616
}
26162617
Block::erase_useless_column(block, origin_column_num);
26172618
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
@@ -2921,14 +2922,12 @@ Status OrcReader::_convert_dict_cols_to_string_cols(
29212922
return Status::OK();
29222923
}
29232924
if (!_dict_filter_cols.empty()) {
2924-
// todo: maybe do not need to build name to index map every time
2925-
auto name_to_pos_map = block->get_name_to_pos_map();
29262925
for (auto& dict_filter_cols : _dict_filter_cols) {
2927-
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
2926+
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
29282927
return Status::InternalError("Failed to find dict filter column '{}' in block {}",
29292928
dict_filter_cols.first, block->dump_structure());
29302929
}
2931-
auto pos = name_to_pos_map[dict_filter_cols.first];
2930+
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
29322931
ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos);
29332932
const ColumnPtr& column = column_with_type_and_name.column;
29342933

be/src/vec/exec/format/orc/vorc_reader.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ class OrcReader : public GenericReader {
158158
//If you want to read the file by index instead of column name, set hive_use_column_names to false.
159159
Status init_reader(
160160
const std::vector<std::string>* column_names,
161+
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
161162
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
162163
const VExprContextSPtrs& conjuncts, bool is_acid,
163164
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
@@ -728,6 +729,9 @@ class OrcReader : public GenericReader {
728729
std::set<uint64_t> _column_ids;
729730
std::set<uint64_t> _filter_column_ids;
730731

732+
// Pointer to external column name to block index mapping (from FileScanner)
733+
std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr;
734+
731735
VExprSPtrs _push_down_exprs;
732736
};
733737

0 commit comments

Comments
 (0)