Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,10 @@ Status PushBrokerReader::init() {
_io_ctx->query_id = &_runtime_state->query_id();

auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
uint32_t idx = 0;
for (auto& slot_desc : slot_descs) {
_all_col_names.push_back(to_lower((slot_desc->col_name())));
_col_name_to_block_idx.insert({to_lower(slot_desc->col_name()), idx++});
}

RETURN_IF_ERROR(_init_expr_ctxes());
Expand Down Expand Up @@ -646,9 +648,9 @@ Status PushBrokerReader::_get_next_reader() {
_io_ctx.get(), _runtime_state.get());

init_status = parquet_reader->init_reader(
_all_col_names, _push_down_exprs, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts,
_all_col_names, &_col_name_to_block_idx, _push_down_exprs, _real_tuple_desc,
_default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,
vectorized::TableSchemaChangeHelper::ConstNode::get_instance(), false);
_cur_reader = std::move(parquet_reader);
if (!init_status.ok()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/push_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class PushBrokerReader {

// col names from _slot_descs
std::vector<std::string> _all_col_names;
std::unordered_map<std::string, uint32_t> _col_name_to_block_idx;
vectorized::VExprContextSPtrs _push_down_exprs;
const std::unordered_map<std::string, int>* _col_name_to_slot_id;
// single slot filter conjuncts
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/exec/format/jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ class JniReader : public GenericReader {
return Status::OK();
}

void set_col_name_to_block_idx(
const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx) {
if (_jni_connector) {
_jni_connector->set_col_name_to_block_idx(col_name_to_block_idx);
}
}

protected:
void _collect_profile_before_close() override {
if (_jni_connector) {
Expand Down
80 changes: 40 additions & 40 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,16 @@ Status OrcReader::_create_file_reader() {
}

Status OrcReader::init_reader(
const std::vector<std::string>* column_names, const VExprContextSPtrs& conjuncts,
bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const std::vector<std::string>* column_names,
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node_ptr,
const std::set<uint64_t>& column_ids, const std::set<uint64_t>& filter_column_ids) {
_table_column_names = column_names;
_col_name_to_block_idx = col_name_to_block_idx;
_lazy_read_ctx.conjuncts = conjuncts;
_is_acid = is_acid;
_tuple_descriptor = tuple_descriptor;
Expand Down Expand Up @@ -1334,10 +1337,9 @@ Status OrcReader::_fill_partition_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns) {
DataTypeSerDe::FormatOptions _text_formatOptions;
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
for (const auto& kv : partition_columns) {
auto col_ptr = block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
auto col_ptr = block->get_by_position((*_col_name_to_block_idx)[kv.first])
.column->assume_mutable();
Comment on lines +1341 to +1342
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential null pointer dereference. _col_name_to_block_idx is dereferenced without null checking. If not properly initialized, this will crash. Add a null check or ensure it's always set before use.

Copilot uses AI. Check for mistakes.
const auto& [value, slot_desc] = kv.second;
auto text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
Expand All @@ -1362,18 +1364,16 @@ Status OrcReader::_fill_partition_columns(
Status OrcReader::_fill_missing_columns(
Block* block, uint64_t rows,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
std::set<size_t> positions_to_erase;
for (const auto& kv : missing_columns) {
if (!name_to_pos_map.contains(kv.first)) {
if (!_col_name_to_block_idx->contains(kv.first)) {
return Status::InternalError("Failed to find missing column: {}, block: {}", kv.first,
block->dump_structure());
}
if (kv.second == nullptr) {
// no default column, fill with null
auto mutable_column =
block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
auto mutable_column = block->get_by_position((*_col_name_to_block_idx)[kv.first])
.column->assume_mutable();
auto* nullable_column = static_cast<vectorized::ColumnNullable*>(mutable_column.get());
nullable_column->insert_many_defaults(rows);
} else {
Expand All @@ -1393,10 +1393,11 @@ Status OrcReader::_fill_missing_columns(
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a normal column
result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
auto origin_column_type = block->get_by_position(name_to_pos_map[kv.first]).type;
auto origin_column_type =
block->get_by_position((*_col_name_to_block_idx)[kv.first]).type;
bool is_nullable = origin_column_type->is_nullable();
block->replace_by_position(
name_to_pos_map[kv.first],
(*_col_name_to_block_idx)[kv.first],
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
positions_to_erase.insert(result_column_id);
}
Expand Down Expand Up @@ -2234,10 +2235,9 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, _batch.get(), 0);

// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]);
auto& column_with_type_and_name =
block->get_by_position((*_col_name_to_block_idx)[col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
Expand Down Expand Up @@ -2303,17 +2303,15 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
}
}

// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
for (auto& dict_filter_cols : _dict_filter_cols) {
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
return Status::InternalError(
"Failed to find dict filter column '{}' in block {}",
dict_filter_cols.first, block->dump_structure());
}
auto pos = name_to_pos_map[dict_filter_cols.first];
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
auto& column_with_type_and_name = block->get_by_position(pos);
auto& column_type = column_with_type_and_name.type;
if (column_type->is_nullable()) {
Expand All @@ -2335,7 +2333,8 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
_fill_batch_vec(batch_vec, _batch.get(), 0);

for (auto& col_name : _lazy_read_ctx.all_read_columns) {
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]);
auto& column_with_type_and_name =
block->get_by_position((*_col_name_to_block_idx)[col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
Expand Down Expand Up @@ -2446,17 +2445,17 @@ void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
if (_delete_rows != nullptr) {
_delete_rows_filter_ptr = std::make_unique<IColumn::Filter>(rows, 1);
auto* __restrict _pos_delete_filter_data = _delete_rows_filter_ptr->data();
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
const auto& original_transaction_column = assert_cast<const ColumnInt64&>(*remove_nullable(
block->get_by_position(
name_to_pos_map[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
block->get_by_position((*_col_name_to_block_idx)
[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
.column));
const auto& bucket_id_column = assert_cast<const ColumnInt32&>(*remove_nullable(
block->get_by_position(name_to_pos_map[TransactionalHive::BUCKET_LOWER_CASE])
block->get_by_position(
(*_col_name_to_block_idx)[TransactionalHive::BUCKET_LOWER_CASE])
.column));
const auto& row_id_column = assert_cast<const ColumnInt64&>(*remove_nullable(
block->get_by_position(name_to_pos_map[TransactionalHive::ROW_ID_LOWER_CASE])
block->get_by_position(
(*_col_name_to_block_idx)[TransactionalHive::ROW_ID_LOWER_CASE])
.column));
for (int i = 0; i < rows; ++i) {
auto original_transaction = original_transaction_column.get_int(i);
Expand All @@ -2480,15 +2479,13 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
size_t origin_column_num = block->columns();

if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
for (auto& dict_filter_cols : _dict_filter_cols) {
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
return Status::InternalError("Failed to find dict filter column '{}' in block {}",
dict_filter_cols.first, block->dump_structure());
}
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
auto pos = name_to_pos_map[dict_filter_cols.first];
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
auto& column_with_type_and_name = block->get_by_position(pos);
auto& column_type = column_with_type_and_name.type;
if (column_type->is_nullable()) {
Expand All @@ -2514,10 +2511,9 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
}
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
for (auto& table_col_name : table_col_names) {
auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[table_col_name]);
auto& column_with_type_and_name =
block->get_by_position((*_col_name_to_block_idx)[table_col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto file_column_name = _table_info_node_ptr->children_file_column_name(table_col_name);
Expand Down Expand Up @@ -2569,13 +2565,19 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
if (can_filter_all) {
for (auto& col : table_col_names) {
// clean block to read predicate columns and acid columns
block->get_by_position(name_to_pos_map[col]).column->assume_mutable()->clear();
block->get_by_position((*_col_name_to_block_idx)[col])
.column->assume_mutable()
->clear();
}
for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
block->get_by_position((*_col_name_to_block_idx)[col.first])
.column->assume_mutable()
->clear();
}
for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
block->get_by_position((*_col_name_to_block_idx)[col.first])
.column->assume_mutable()
->clear();
}
Block::erase_useless_column(block, origin_column_num);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
Expand Down Expand Up @@ -2885,14 +2887,12 @@ Status OrcReader::_convert_dict_cols_to_string_cols(
return Status::OK();
}
if (!_dict_filter_cols.empty()) {
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block->get_name_to_pos_map();
for (auto& dict_filter_cols : _dict_filter_cols) {
if (!name_to_pos_map.contains(dict_filter_cols.first)) {
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
return Status::InternalError("Failed to find dict filter column '{}' in block {}",
dict_filter_cols.first, block->dump_structure());
}
auto pos = name_to_pos_map[dict_filter_cols.first];
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos);
const ColumnPtr& column = column_with_type_and_name.column;

Expand Down
10 changes: 7 additions & 3 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ class OrcReader : public GenericReader {
~OrcReader() override = default;
//If you want to read the file by index instead of column name, set hive_use_column_names to false.
Status init_reader(
const std::vector<std::string>* column_names, const VExprContextSPtrs& conjuncts,
bool is_acid, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::vector<std::string>* column_names,
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const VExprContextSPtrs& conjuncts, bool is_acid,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node_ptr =
Expand Down Expand Up @@ -726,6 +727,9 @@ class OrcReader : public GenericReader {
std::set<uint64_t> _column_ids;
std::set<uint64_t> _filter_column_ids;

// Pointer to external column name to block index mapping (from FileScanner)
std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr;

VExprSPtrs _push_down_exprs;
};

Expand Down
Loading
Loading