Skip to content

Commit 5f3512a

Browse files
committed
init commit.
1 parent a9884d6 commit 5f3512a

34 files changed

+383
-215
lines changed

be/src/olap/push_handler.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -398,8 +398,10 @@ Status PushBrokerReader::init() {
398398
_io_ctx->query_id = &_runtime_state->query_id();
399399

400400
auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
401+
uint32_t idx = 0;
401402
for (auto& slot_desc : slot_descs) {
402403
_all_col_names.push_back(to_lower((slot_desc->col_name())));
404+
_col_name_to_block_idx.insert({to_lower(slot_desc->col_name()), idx++});
403405
}
404406

405407
RETURN_IF_ERROR(_init_expr_ctxes());
@@ -646,9 +648,9 @@ Status PushBrokerReader::_get_next_reader() {
646648
_io_ctx.get(), _runtime_state.get());
647649

648650
init_status = parquet_reader->init_reader(
649-
_all_col_names, _push_down_exprs, _real_tuple_desc, _default_val_row_desc.get(),
650-
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
651-
&_slot_id_to_filter_conjuncts,
651+
_all_col_names, &_col_name_to_block_idx, _push_down_exprs, _real_tuple_desc,
652+
_default_val_row_desc.get(), _col_name_to_slot_id,
653+
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,
652654
vectorized::TableSchemaChangeHelper::ConstNode::get_instance(), false);
653655
_cur_reader = std::move(parquet_reader);
654656
if (!init_status.ok()) {

be/src/olap/push_handler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ class PushBrokerReader {
139139

140140
// col names from _slot_descs
141141
std::vector<std::string> _all_col_names;
142+
std::unordered_map<std::string, uint32_t> _col_name_to_block_idx;
142143
vectorized::VExprContextSPtrs _push_down_exprs;
143144
const std::unordered_map<std::string, int>* _col_name_to_slot_id;
144145
// single slot filter conjuncts

be/src/vec/exec/format/jni_reader.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ class JniReader : public GenericReader {
6868
return Status::OK();
6969
}
7070

71+
void set_col_name_to_block_idx(
72+
const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx) {
73+
if (_jni_connector) {
74+
_jni_connector->set_col_name_to_block_idx(col_name_to_block_idx);
75+
}
76+
}
77+
7178
protected:
7279
void _collect_profile_before_close() override {
7380
if (_jni_connector) {

be/src/vec/exec/format/orc/vorc_reader.cpp

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -350,13 +350,16 @@ Status OrcReader::_create_file_reader() {
350350
}
351351

352352
Status OrcReader::init_reader(
353-
const std::vector<std::string>* column_names, const VExprContextSPtrs& conjuncts,
354-
bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
353+
const std::vector<std::string>* column_names,
354+
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
355+
const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor,
356+
const RowDescriptor* row_descriptor,
355357
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
356358
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
357359
std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node_ptr,
358360
const std::set<uint64_t>& column_ids, const std::set<uint64_t>& filter_column_ids) {
359361
_table_column_names = column_names;
362+
_col_name_to_block_idx = col_name_to_block_idx;
360363
_lazy_read_ctx.conjuncts = conjuncts;
361364
_is_acid = is_acid;
362365
_tuple_descriptor = tuple_descriptor;
@@ -1334,10 +1337,9 @@ Status OrcReader::_fill_partition_columns(
13341337
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
13351338
partition_columns) {
13361339
DataTypeSerDe::FormatOptions _text_formatOptions;
1337-
// todo: maybe do not need to build name to index map every time
1338-
auto name_to_pos_map = block->get_name_to_pos_map();
13391340
for (const auto& kv : partition_columns) {
1340-
auto col_ptr = block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
1341+
auto col_ptr = block->get_by_position((*_col_name_to_block_idx)[kv.first])
1342+
.column->assume_mutable();
13411343
const auto& [value, slot_desc] = kv.second;
13421344
auto text_serde = slot_desc->get_data_type_ptr()->get_serde();
13431345
Slice slice(value.data(), value.size());
@@ -1362,18 +1364,16 @@ Status OrcReader::_fill_partition_columns(
13621364
Status OrcReader::_fill_missing_columns(
13631365
Block* block, uint64_t rows,
13641366
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
1365-
// todo: maybe do not need to build name to index map every time
1366-
auto name_to_pos_map = block->get_name_to_pos_map();
13671367
std::set<size_t> positions_to_erase;
13681368
for (const auto& kv : missing_columns) {
1369-
if (!name_to_pos_map.contains(kv.first)) {
1369+
if (!_col_name_to_block_idx->contains(kv.first)) {
13701370
return Status::InternalError("Failed to find missing column: {}, block: {}", kv.first,
13711371
block->dump_structure());
13721372
}
13731373
if (kv.second == nullptr) {
13741374
// no default column, fill with null
1375-
auto mutable_column =
1376-
block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
1375+
auto mutable_column = block->get_by_position((*_col_name_to_block_idx)[kv.first])
1376+
.column->assume_mutable();
13771377
auto* nullable_column = static_cast<vectorized::ColumnNullable*>(mutable_column.get());
13781378
nullable_column->insert_many_defaults(rows);
13791379
} else {
@@ -1393,10 +1393,11 @@ Status OrcReader::_fill_missing_columns(
13931393
mutable_column->resize(rows);
13941394
// result_column_ptr maybe a ColumnConst, convert it to a normal column
13951395
result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
1396-
auto origin_column_type = block->get_by_position(name_to_pos_map[kv.first]).type;
1396+
auto origin_column_type =
1397+
block->get_by_position((*_col_name_to_block_idx)[kv.first]).type;
13971398
bool is_nullable = origin_column_type->is_nullable();
13981399
block->replace_by_position(
1399-
name_to_pos_map[kv.first],
1400+
(*_col_name_to_block_idx)[kv.first],
14001401
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
14011402
positions_to_erase.insert(result_column_id);
14021403
}
@@ -2234,10 +2235,9 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
22342235
std::vector<orc::ColumnVectorBatch*> batch_vec;
22352236
_fill_batch_vec(batch_vec, _batch.get(), 0);
22362237

2237-
// todo: maybe do not need to build name to index map every time
2238-
auto name_to_pos_map = block->get_name_to_pos_map();
22392238
for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
2240-
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]);
2239+
auto& column_with_type_and_name =
2240+
block->get_by_position((*_col_name_to_block_idx)[col_name]);
22412241
auto& column_ptr = column_with_type_and_name.column;
22422242
auto& column_type = column_with_type_and_name.type;
22432243
auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
@@ -2303,17 +2303,15 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
23032303
}
23042304
}
23052305

2306-
// todo: maybe do not need to build name to index map every time
2307-
auto name_to_pos_map = block->get_name_to_pos_map();
23082306
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
23092307
for (auto& dict_filter_cols : _dict_filter_cols) {
23102308
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
2311-
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
2309+
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
23122310
return Status::InternalError(
23132311
"Failed to find dict filter column '{}' in block {}",
23142312
dict_filter_cols.first, block->dump_structure());
23152313
}
2316-
auto pos = name_to_pos_map[dict_filter_cols.first];
2314+
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
23172315
auto& column_with_type_and_name = block->get_by_position(pos);
23182316
auto& column_type = column_with_type_and_name.type;
23192317
if (column_type->is_nullable()) {
@@ -2335,7 +2333,8 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
23352333
_fill_batch_vec(batch_vec, _batch.get(), 0);
23362334

23372335
for (auto& col_name : _lazy_read_ctx.all_read_columns) {
2338-
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]);
2336+
auto& column_with_type_and_name =
2337+
block->get_by_position((*_col_name_to_block_idx)[col_name]);
23392338
auto& column_ptr = column_with_type_and_name.column;
23402339
auto& column_type = column_with_type_and_name.type;
23412340
auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
@@ -2446,17 +2445,17 @@ void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
24462445
if (_delete_rows != nullptr) {
24472446
_delete_rows_filter_ptr = std::make_unique<IColumn::Filter>(rows, 1);
24482447
auto* __restrict _pos_delete_filter_data = _delete_rows_filter_ptr->data();
2449-
// todo: maybe do not need to build name to index map every time
2450-
auto name_to_pos_map = block->get_name_to_pos_map();
24512448
const auto& original_transaction_column = assert_cast<const ColumnInt64&>(*remove_nullable(
2452-
block->get_by_position(
2453-
name_to_pos_map[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
2449+
block->get_by_position((*_col_name_to_block_idx)
2450+
[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
24542451
.column));
24552452
const auto& bucket_id_column = assert_cast<const ColumnInt32&>(*remove_nullable(
2456-
block->get_by_position(name_to_pos_map[TransactionalHive::BUCKET_LOWER_CASE])
2453+
block->get_by_position(
2454+
(*_col_name_to_block_idx)[TransactionalHive::BUCKET_LOWER_CASE])
24572455
.column));
24582456
const auto& row_id_column = assert_cast<const ColumnInt64&>(*remove_nullable(
2459-
block->get_by_position(name_to_pos_map[TransactionalHive::ROW_ID_LOWER_CASE])
2457+
block->get_by_position(
2458+
(*_col_name_to_block_idx)[TransactionalHive::ROW_ID_LOWER_CASE])
24602459
.column));
24612460
for (int i = 0; i < rows; ++i) {
24622461
auto original_transaction = original_transaction_column.get_int(i);
@@ -2480,15 +2479,13 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
24802479
size_t origin_column_num = block->columns();
24812480

24822481
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
2483-
// todo: maybe do not need to build name to index map every time
2484-
auto name_to_pos_map = block->get_name_to_pos_map();
24852482
for (auto& dict_filter_cols : _dict_filter_cols) {
2486-
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
2483+
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
24872484
return Status::InternalError("Failed to find dict filter column '{}' in block {}",
24882485
dict_filter_cols.first, block->dump_structure());
24892486
}
24902487
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
2491-
auto pos = name_to_pos_map[dict_filter_cols.first];
2488+
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
24922489
auto& column_with_type_and_name = block->get_by_position(pos);
24932490
auto& column_type = column_with_type_and_name.type;
24942491
if (column_type->is_nullable()) {
@@ -2514,10 +2511,9 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
25142511
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
25152512
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
25162513
}
2517-
// todo: maybe do not need to build name to index map every time
2518-
auto name_to_pos_map = block->get_name_to_pos_map();
25192514
for (auto& table_col_name : table_col_names) {
2520-
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[table_col_name]);
2515+
auto& column_with_type_and_name =
2516+
block->get_by_position((*_col_name_to_block_idx)[table_col_name]);
25212517
auto& column_ptr = column_with_type_and_name.column;
25222518
auto& column_type = column_with_type_and_name.type;
25232519
auto file_column_name = _table_info_node_ptr->children_file_column_name(table_col_name);
@@ -2569,13 +2565,19 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
25692565
if (can_filter_all) {
25702566
for (auto& col : table_col_names) {
25712567
// clean block to read predicate columns and acid columns
2572-
block->get_by_position(name_to_pos_map[col]).column->assume_mutable()->clear();
2568+
block->get_by_position((*_col_name_to_block_idx)[col])
2569+
.column->assume_mutable()
2570+
->clear();
25732571
}
25742572
for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
2575-
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
2573+
block->get_by_position((*_col_name_to_block_idx)[col.first])
2574+
.column->assume_mutable()
2575+
->clear();
25762576
}
25772577
for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
2578-
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
2578+
block->get_by_position((*_col_name_to_block_idx)[col.first])
2579+
.column->assume_mutable()
2580+
->clear();
25792581
}
25802582
Block::erase_useless_column(block, origin_column_num);
25812583
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
@@ -2885,14 +2887,12 @@ Status OrcReader::_convert_dict_cols_to_string_cols(
28852887
return Status::OK();
28862888
}
28872889
if (!_dict_filter_cols.empty()) {
2888-
// todo: maybe do not need to build name to index map every time
2889-
auto name_to_pos_map = block->get_name_to_pos_map();
28902890
for (auto& dict_filter_cols : _dict_filter_cols) {
2891-
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
2891+
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
28922892
return Status::InternalError("Failed to find dict filter column '{}' in block {}",
28932893
dict_filter_cols.first, block->dump_structure());
28942894
}
2895-
auto pos = name_to_pos_map[dict_filter_cols.first];
2895+
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
28962896
ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos);
28972897
const ColumnPtr& column = column_with_type_and_name.column;
28982898

be/src/vec/exec/format/orc/vorc_reader.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,10 @@ class OrcReader : public GenericReader {
157157
~OrcReader() override = default;
158158
//If you want to read the file by index instead of column name, set hive_use_column_names to false.
159159
Status init_reader(
160-
const std::vector<std::string>* column_names, const VExprContextSPtrs& conjuncts,
161-
bool is_acid, const TupleDescriptor* tuple_descriptor,
162-
const RowDescriptor* row_descriptor,
160+
const std::vector<std::string>* column_names,
161+
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
162+
const VExprContextSPtrs& conjuncts, bool is_acid,
163+
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
163164
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
164165
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
165166
std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node_ptr =
@@ -726,6 +727,9 @@ class OrcReader : public GenericReader {
726727
std::set<uint64_t> _column_ids;
727728
std::set<uint64_t> _filter_column_ids;
728729

730+
// Pointer to external column name to block index mapping (from FileScanner)
731+
std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr;
732+
729733
VExprSPtrs _push_down_exprs;
730734
};
731735

0 commit comments

Comments
 (0)