Skip to content

Commit b0d62d6

Browse files
committed
[Enhancement](parquet)update runtime filter when read next parquet row group.
1 parent 43f2d40 commit b0d62d6

File tree

8 files changed

+81
-18
lines changed

8 files changed

+81
-18
lines changed

be/src/runtime_filter/runtime_filter_consumer_helper.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ class RuntimeFilterConsumerHelper {
5252
// parent_operator_profile is owned by LocalState so update it is safe at here.
5353
void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
5454

55+
size_t rf_num() {
56+
return _runtime_filter_descs.size();
57+
}
5558
private:
5659
// Append late-arrival runtime filters to the vconjunct_ctx.
5760
Status _append_rf_into_conjuncts(RuntimeState* state,

be/src/vec/exec/format/parquet/vparquet_group_reader.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
318318

319319
RETURN_IF_ERROR(_fill_row_id_columns(block, *read_rows, modify_row_ids));
320320

321-
Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts, block, block->columns());
321+
Status st = VExprContext::filter_block(*_lazy_read_ctx.conjuncts, block, block->columns());
322322
*read_rows = block->rows();
323323
return st;
324324
}
@@ -349,7 +349,7 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
349349
for (uint32_t i = 0; i < column_to_keep; ++i) {
350350
columns_to_filter[i] = i;
351351
}
352-
if (!_lazy_read_ctx.conjuncts.empty()) {
352+
if (!_lazy_read_ctx.conjuncts->empty()) {
353353
std::vector<IColumn::Filter*> filters;
354354
if (_position_delete_ctx.has_filter) {
355355
filters.push_back(_pos_delete_filter_ptr.get());

be/src/vec/exec/format/parquet/vparquet_group_reader.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,13 @@ class RowGroupReader : public ProfileCollector {
7979

8080
// table name
8181
struct LazyReadContext {
82-
VExprContextSPtrs conjuncts;
82+
const VExprContextSPtrs* conjuncts;
83+
84+
//ParquetReader::set_fill_columns
85+
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> *
86+
fill_partition_columns;
87+
const std::unordered_map<std::string, VExprContextSPtr>* fill_missing_columns;
88+
8389
bool can_lazy_read = false;
8490
// block->rows() returns the number of rows of the first column,
8591
// so we should check and resize the first column

be/src/vec/exec/format/parquet/vparquet_reader.cpp

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ Status ParquetReader::init_reader(
382382
}
383383
}
384384
// build column predicates for column lazy read
385-
_lazy_read_ctx.conjuncts = conjuncts;
385+
_lazy_read_ctx.conjuncts = &conjuncts;
386386
return Status::OK();
387387
}
388388

@@ -408,11 +408,18 @@ bool ParquetReader::_type_matches(const VSlotRef* slot_ref) const {
408408
!is_complex_type(table_col_type->get_primitive_type());
409409
}
410410

411-
Status ParquetReader::set_fill_columns(
412-
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
413-
partition_columns,
414-
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
415-
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
411+
412+
Status ParquetReader::_update_lazy_read_ctx() {
413+
RowGroupReader::LazyReadContext new_lazy_read_ctx;
414+
new_lazy_read_ctx.conjuncts = _lazy_read_ctx.conjuncts;
415+
new_lazy_read_ctx.fill_partition_columns = _lazy_read_ctx.fill_partition_columns;
416+
new_lazy_read_ctx.fill_missing_columns = _lazy_read_ctx.fill_missing_columns;
417+
_lazy_read_ctx = std::move(new_lazy_read_ctx);
418+
419+
_top_runtime_vexprs.clear();
420+
_push_down_predicates.clear();
421+
_useless_predicates.clear();
422+
416423
// std::unordered_map<column_name, std::pair<col_id, slot_id>>
417424
std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_columns;
418425
// visit_slot for lazy mat.
@@ -432,7 +439,7 @@ Status ParquetReader::set_fill_columns(
432439
}
433440
};
434441

435-
for (const auto& conjunct : _lazy_read_ctx.conjuncts) {
442+
for (const auto& conjunct : *_lazy_read_ctx.conjuncts) {
436443
auto expr = conjunct->root();
437444

438445
if (expr->is_rf_wrapper()) {
@@ -454,8 +461,8 @@ Status ParquetReader::set_fill_columns(
454461

455462
int max_in_size =
456463
_state->query_options().__isset.max_pushdown_conditions_per_column
457-
? _state->query_options().max_pushdown_conditions_per_column
458-
: 1024;
464+
? _state->query_options().max_pushdown_conditions_per_column
465+
: 1024;
459466
if (direct_in_predicate->get_set_func()->size() == 0 ||
460467
direct_in_predicate->get_set_func()->size() > max_in_size) {
461468
continue;
@@ -519,7 +526,7 @@ Status ParquetReader::set_fill_columns(
519526
_lazy_read_ctx.all_predicate_col_ids.emplace_back(_row_id_column_iterator_pair.second);
520527
}
521528

522-
for (auto& kv : partition_columns) {
529+
for (auto& kv : *_lazy_read_ctx.fill_partition_columns) {
523530
auto iter = predicate_columns.find(kv.first);
524531
if (iter == predicate_columns.end()) {
525532
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
@@ -529,7 +536,7 @@ Status ParquetReader::set_fill_columns(
529536
}
530537
}
531538

532-
for (auto& kv : missing_columns) {
539+
for (auto& kv : *_lazy_read_ctx.fill_missing_columns) {
533540
auto iter = predicate_columns.find(kv.first);
534541
if (iter == predicate_columns.end()) {
535542
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
@@ -561,6 +568,19 @@ Status ParquetReader::set_fill_columns(
561568
}
562569
}
563570

571+
return Status::OK();
572+
}
573+
574+
Status ParquetReader::set_fill_columns(
575+
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
576+
partition_columns,
577+
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
578+
// SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
579+
580+
_lazy_read_ctx.fill_partition_columns = &partition_columns;
581+
_lazy_read_ctx.fill_missing_columns = &missing_columns;
582+
RETURN_IF_ERROR(_update_lazy_read_ctx());
583+
564584
if (_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) {
565585
return Status::EndOfFile("No row group to read");
566586
}
@@ -698,6 +718,12 @@ Status ParquetReader::_next_row_group_reader() {
698718
continue;
699719
}
700720

721+
722+
bool has_late_rf_cond = false;
723+
RETURN_IF_ERROR(_call_late_rf_func(&has_late_rf_cond));
724+
if (has_late_rf_cond) {
725+
RETURN_IF_ERROR(_update_lazy_read_ctx());
726+
}
701727
size_t before_predicate_size = _push_down_predicates.size();
702728
_push_down_predicates.reserve(before_predicate_size + _top_runtime_vexprs.size());
703729
for (const auto& vexpr : _top_runtime_vexprs) {

be/src/vec/exec/format/parquet/vparquet_reader.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,17 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
165165

166166
bool count_read_rows() override { return true; }
167167

168+
void set_update_late_rf_func(std::function<Status(bool*)>&& func) {
169+
_call_late_rf_func = std::move(func);
170+
}
171+
172+
std::function<Status(bool*)> _call_late_rf_func = [](bool* changed) {
173+
*changed = false;
174+
return Status::OK();
175+
};
176+
177+
Status _update_lazy_read_ctx();
178+
168179
protected:
169180
void _collect_profile_before_close() override;
170181

@@ -345,6 +356,7 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
345356
std::vector<std::unique_ptr<MutilColumnBlockPredicate>> _push_down_predicates;
346357
std::vector<std::shared_ptr<ColumnPredicate>> _useless_predicates;
347358
Arena _arena;
359+
348360
};
349361
#include "common/compile_check_end.h"
350362

be/src/vec/exec/scan/file_scanner.cpp

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,8 +357,10 @@ Status FileScanner::_process_conjuncts() {
357357
return Status::OK();
358358
}
359359

360-
Status FileScanner::_process_late_arrival_conjuncts() {
360+
Status FileScanner::_process_late_arrival_conjuncts(bool* changed) {
361+
*changed = false;
361362
if (_push_down_conjuncts.size() < _conjuncts.size()) {
363+
*changed = true;
362364
_push_down_conjuncts.clear();
363365
_push_down_conjuncts.resize(_conjuncts.size());
364366
for (size_t i = 0; i != _conjuncts.size(); ++i) {
@@ -1052,8 +1054,17 @@ Status FileScanner::_get_next_reader() {
10521054
// see IcebergTableReader::init_row_filters for example.
10531055
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
10541056
if (push_down_predicates) {
1055-
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1057+
bool changed = false;
1058+
RETURN_IF_ERROR(_process_late_arrival_conjuncts(&changed));
10561059
}
1060+
std::function<Status(bool*)> update_late_rf = [&](bool* changed) -> Status {
1061+
if (!_is_load) {
1062+
RETURN_IF_ERROR(try_append_late_arrival_runtime_filter());
1063+
RETURN_IF_ERROR(_process_late_arrival_conjuncts(changed));
1064+
}
1065+
return Status::OK();
1066+
};
1067+
parquet_reader->set_update_late_rf_func(std::move(update_late_rf));
10571068
RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
10581069

10591070
need_to_get_parsed_schema = true;
@@ -1074,7 +1085,8 @@ Status FileScanner::_get_next_reader() {
10741085

10751086
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
10761087
if (push_down_predicates) {
1077-
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1088+
bool changed = false;
1089+
RETURN_IF_ERROR(_process_late_arrival_conjuncts(&changed));
10781090
}
10791091
RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
10801092

be/src/vec/exec/scan/file_scanner.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ class FileScanner : public Scanner {
100100

101101
void update_realtime_counters() override;
102102

103+
104+
105+
103106
protected:
104107
Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
105108

@@ -251,7 +254,7 @@ class FileScanner : public Scanner {
251254
void _init_runtime_filter_partition_prune_block();
252255
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
253256
Status _process_conjuncts();
254-
Status _process_late_arrival_conjuncts();
257+
Status _process_late_arrival_conjuncts(bool* changed);
255258
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
256259
Status _generate_truncate_columns(bool need_to_get_parsed_schema);
257260
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);

be/src/vec/exec/scan/scanner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Scanner::Scanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
4141
_output_tuple_desc(_local_state->output_tuple_desc()),
4242
_output_row_descriptor(_local_state->_parent->output_row_descriptor()),
4343
_has_prepared(false) {
44+
_total_rf_num = cast_set<int>(_local_state->_helper.rf_num());
4445
DorisMetrics::instance()->scanner_cnt->increment(1);
4546
}
4647

0 commit comments

Comments
 (0)