Skip to content

Commit 826def9

Browse files
committed
init commit.
1 parent a9884d6 commit 826def9

28 files changed

+360
-226
lines changed

be/src/olap/push_handler.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -646,9 +646,9 @@ Status PushBrokerReader::_get_next_reader() {
646646
_io_ctx.get(), _runtime_state.get());
647647

648648
init_status = parquet_reader->init_reader(
649-
_all_col_names, _push_down_exprs, _real_tuple_desc, _default_val_row_desc.get(),
650-
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
651-
&_slot_id_to_filter_conjuncts,
649+
_all_col_names, nullptr, _push_down_exprs, _real_tuple_desc,
650+
_default_val_row_desc.get(), _col_name_to_slot_id,
651+
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,
652652
vectorized::TableSchemaChangeHelper::ConstNode::get_instance(), false);
653653
_cur_reader = std::move(parquet_reader);
654654
if (!init_status.ok()) {

be/src/vec/exec/format/jni_reader.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ class JniReader : public GenericReader {
6868
return Status::OK();
6969
}
7070

71+
void set_col_name_to_block_idx(
72+
const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx) {
73+
if (_jni_connector) {
74+
_jni_connector->set_col_name_to_block_idx(col_name_to_block_idx);
75+
}
76+
}
77+
7178
protected:
7279
void _collect_profile_before_close() override {
7380
if (_jni_connector) {

be/src/vec/exec/format/orc/vorc_reader.cpp

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,10 @@ Status OrcReader::_create_file_reader() {
350350
}
351351

352352
Status OrcReader::init_reader(
353-
const std::vector<std::string>* column_names, const VExprContextSPtrs& conjuncts,
354-
bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
353+
const std::vector<std::string>* column_names,
354+
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
355+
const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor,
356+
const RowDescriptor* row_descriptor,
355357
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
356358
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
357359
std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node_ptr,
@@ -1334,10 +1336,9 @@ Status OrcReader::_fill_partition_columns(
13341336
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
13351337
partition_columns) {
13361338
DataTypeSerDe::FormatOptions _text_formatOptions;
1337-
// todo: maybe do not need to build name to index map every time
1338-
auto name_to_pos_map = block->get_name_to_pos_map();
13391339
for (const auto& kv : partition_columns) {
1340-
auto col_ptr = block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
1340+
auto col_ptr = block->get_by_position((*_col_name_to_block_idx)[kv.first])
1341+
.column->assume_mutable();
13411342
const auto& [value, slot_desc] = kv.second;
13421343
auto text_serde = slot_desc->get_data_type_ptr()->get_serde();
13431344
Slice slice(value.data(), value.size());
@@ -1362,18 +1363,16 @@ Status OrcReader::_fill_partition_columns(
13621363
Status OrcReader::_fill_missing_columns(
13631364
Block* block, uint64_t rows,
13641365
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
1365-
// todo: maybe do not need to build name to index map every time
1366-
auto name_to_pos_map = block->get_name_to_pos_map();
13671366
std::set<size_t> positions_to_erase;
13681367
for (const auto& kv : missing_columns) {
1369-
if (!name_to_pos_map.contains(kv.first)) {
1368+
if (!_col_name_to_block_idx->contains(kv.first)) {
13701369
return Status::InternalError("Failed to find missing column: {}, block: {}", kv.first,
13711370
block->dump_structure());
13721371
}
13731372
if (kv.second == nullptr) {
13741373
// no default column, fill with null
1375-
auto mutable_column =
1376-
block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
1374+
auto mutable_column = block->get_by_position((*_col_name_to_block_idx)[kv.first])
1375+
.column->assume_mutable();
13771376
auto* nullable_column = static_cast<vectorized::ColumnNullable*>(mutable_column.get());
13781377
nullable_column->insert_many_defaults(rows);
13791378
} else {
@@ -1393,10 +1392,11 @@ Status OrcReader::_fill_missing_columns(
13931392
mutable_column->resize(rows);
13941393
// result_column_ptr maybe a ColumnConst, convert it to a normal column
13951394
result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
1396-
auto origin_column_type = block->get_by_position(name_to_pos_map[kv.first]).type;
1395+
auto origin_column_type =
1396+
block->get_by_position((*_col_name_to_block_idx)[kv.first]).type;
13971397
bool is_nullable = origin_column_type->is_nullable();
13981398
block->replace_by_position(
1399-
name_to_pos_map[kv.first],
1399+
(*_col_name_to_block_idx)[kv.first],
14001400
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
14011401
positions_to_erase.insert(result_column_id);
14021402
}
@@ -2234,10 +2234,9 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
22342234
std::vector<orc::ColumnVectorBatch*> batch_vec;
22352235
_fill_batch_vec(batch_vec, _batch.get(), 0);
22362236

2237-
// todo: maybe do not need to build name to index map every time
2238-
auto name_to_pos_map = block->get_name_to_pos_map();
22392237
for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
2240-
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]);
2238+
auto& column_with_type_and_name =
2239+
block->get_by_position((*_col_name_to_block_idx)[col_name]);
22412240
auto& column_ptr = column_with_type_and_name.column;
22422241
auto& column_type = column_with_type_and_name.type;
22432242
auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
@@ -2303,17 +2302,15 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
23032302
}
23042303
}
23052304

2306-
// todo: maybe do not need to build name to index map every time
2307-
auto name_to_pos_map = block->get_name_to_pos_map();
23082305
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
23092306
for (auto& dict_filter_cols : _dict_filter_cols) {
23102307
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
2311-
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
2308+
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
23122309
return Status::InternalError(
23132310
"Failed to find dict filter column '{}' in block {}",
23142311
dict_filter_cols.first, block->dump_structure());
23152312
}
2316-
auto pos = name_to_pos_map[dict_filter_cols.first];
2313+
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
23172314
auto& column_with_type_and_name = block->get_by_position(pos);
23182315
auto& column_type = column_with_type_and_name.type;
23192316
if (column_type->is_nullable()) {
@@ -2335,7 +2332,8 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
23352332
_fill_batch_vec(batch_vec, _batch.get(), 0);
23362333

23372334
for (auto& col_name : _lazy_read_ctx.all_read_columns) {
2338-
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]);
2335+
auto& column_with_type_and_name =
2336+
block->get_by_position((*_col_name_to_block_idx)[col_name]);
23392337
auto& column_ptr = column_with_type_and_name.column;
23402338
auto& column_type = column_with_type_and_name.type;
23412339
auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
@@ -2446,17 +2444,17 @@ void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
24462444
if (_delete_rows != nullptr) {
24472445
_delete_rows_filter_ptr = std::make_unique<IColumn::Filter>(rows, 1);
24482446
auto* __restrict _pos_delete_filter_data = _delete_rows_filter_ptr->data();
2449-
// todo: maybe do not need to build name to index map every time
2450-
auto name_to_pos_map = block->get_name_to_pos_map();
24512447
const auto& original_transaction_column = assert_cast<const ColumnInt64&>(*remove_nullable(
2452-
block->get_by_position(
2453-
name_to_pos_map[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
2448+
block->get_by_position((*_col_name_to_block_idx)
2449+
[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
24542450
.column));
24552451
const auto& bucket_id_column = assert_cast<const ColumnInt32&>(*remove_nullable(
2456-
block->get_by_position(name_to_pos_map[TransactionalHive::BUCKET_LOWER_CASE])
2452+
block->get_by_position(
2453+
(*_col_name_to_block_idx)[TransactionalHive::BUCKET_LOWER_CASE])
24572454
.column));
24582455
const auto& row_id_column = assert_cast<const ColumnInt64&>(*remove_nullable(
2459-
block->get_by_position(name_to_pos_map[TransactionalHive::ROW_ID_LOWER_CASE])
2456+
block->get_by_position(
2457+
(*_col_name_to_block_idx)[TransactionalHive::ROW_ID_LOWER_CASE])
24602458
.column));
24612459
for (int i = 0; i < rows; ++i) {
24622460
auto original_transaction = original_transaction_column.get_int(i);
@@ -2480,15 +2478,13 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
24802478
size_t origin_column_num = block->columns();
24812479

24822480
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
2483-
// todo: maybe do not need to build name to index map every time
2484-
auto name_to_pos_map = block->get_name_to_pos_map();
24852481
for (auto& dict_filter_cols : _dict_filter_cols) {
2486-
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
2482+
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
24872483
return Status::InternalError("Failed to find dict filter column '{}' in block {}",
24882484
dict_filter_cols.first, block->dump_structure());
24892485
}
24902486
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
2491-
auto pos = name_to_pos_map[dict_filter_cols.first];
2487+
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
24922488
auto& column_with_type_and_name = block->get_by_position(pos);
24932489
auto& column_type = column_with_type_and_name.type;
24942490
if (column_type->is_nullable()) {
@@ -2514,10 +2510,9 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
25142510
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
25152511
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
25162512
}
2517-
// todo: maybe do not need to build name to index map every time
2518-
auto name_to_pos_map = block->get_name_to_pos_map();
25192513
for (auto& table_col_name : table_col_names) {
2520-
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[table_col_name]);
2514+
auto& column_with_type_and_name =
2515+
block->get_by_position((*_col_name_to_block_idx)[table_col_name]);
25212516
auto& column_ptr = column_with_type_and_name.column;
25222517
auto& column_type = column_with_type_and_name.type;
25232518
auto file_column_name = _table_info_node_ptr->children_file_column_name(table_col_name);
@@ -2569,13 +2564,19 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
25692564
if (can_filter_all) {
25702565
for (auto& col : table_col_names) {
25712566
// clean block to read predicate columns and acid columns
2572-
block->get_by_position(name_to_pos_map[col]).column->assume_mutable()->clear();
2567+
block->get_by_position((*_col_name_to_block_idx)[col])
2568+
.column->assume_mutable()
2569+
->clear();
25732570
}
25742571
for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
2575-
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
2572+
block->get_by_position((*_col_name_to_block_idx)[col.first])
2573+
.column->assume_mutable()
2574+
->clear();
25762575
}
25772576
for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
2578-
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
2577+
block->get_by_position((*_col_name_to_block_idx)[col.first])
2578+
.column->assume_mutable()
2579+
->clear();
25792580
}
25802581
Block::erase_useless_column(block, origin_column_num);
25812582
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
@@ -2885,14 +2886,12 @@ Status OrcReader::_convert_dict_cols_to_string_cols(
28852886
return Status::OK();
28862887
}
28872888
if (!_dict_filter_cols.empty()) {
2888-
// todo: maybe do not need to build name to index map every time
2889-
auto name_to_pos_map = block->get_name_to_pos_map();
28902889
for (auto& dict_filter_cols : _dict_filter_cols) {
2891-
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
2890+
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
28922891
return Status::InternalError("Failed to find dict filter column '{}' in block {}",
28932892
dict_filter_cols.first, block->dump_structure());
28942893
}
2895-
auto pos = name_to_pos_map[dict_filter_cols.first];
2894+
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
28962895
ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos);
28972896
const ColumnPtr& column = column_with_type_and_name.column;
28982897

be/src/vec/exec/format/orc/vorc_reader.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,10 @@ class OrcReader : public GenericReader {
157157
~OrcReader() override = default;
158158
//If you want to read the file by index instead of column name, set hive_use_column_names to false.
159159
Status init_reader(
160-
const std::vector<std::string>* column_names, const VExprContextSPtrs& conjuncts,
161-
bool is_acid, const TupleDescriptor* tuple_descriptor,
162-
const RowDescriptor* row_descriptor,
160+
const std::vector<std::string>* column_names,
161+
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
162+
const VExprContextSPtrs& conjuncts, bool is_acid,
163+
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
163164
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
164165
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
165166
std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node_ptr =
@@ -726,6 +727,10 @@ class OrcReader : public GenericReader {
726727
std::set<uint64_t> _column_ids;
727728
std::set<uint64_t> _filter_column_ids;
728729

730+
// Pointer to external column name to block index mapping (from FileScanner)
731+
// This allows IcebergTableReader to dynamically add expand columns
732+
std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr;
733+
729734
VExprSPtrs _push_down_exprs;
730735
};
731736

0 commit comments

Comments
 (0)