Skip to content

Commit 348c63c

Browse files
authored
branch-4.0: [Enhancement](parquet)update runtime filter when read next parquet row group. (#59053) (#59725)
bp #59053 bp #59557 ### What problem does this PR solve? Problem Summary: This pull request achieves better filtering by fetching the latest join runtime filter when creating the Parquet row group reader. Previously, the join runtime filter was fetched at the Parquet file level.
1 parent a175e04 commit 348c63c

File tree

11 files changed

+272
-13
lines changed

11 files changed

+272
-13
lines changed

be/src/runtime_filter/runtime_filter_consumer_helper.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class RuntimeFilterConsumerHelper {
5252
// parent_operator_profile is owned by LocalState so update it is safe at here.
5353
void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
5454

55+
size_t runtime_filter_nums() const { return _runtime_filter_descs.size(); }
56+
5557
private:
5658
// Append late-arrival runtime filters to the vconjunct_ctx.
5759
Status _append_rf_into_conjuncts(RuntimeState* state,

be/src/vec/exec/format/parquet/vparquet_group_reader.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,14 @@ class RowGroupReader : public ProfileCollector {
7979

8080
// table name
8181
struct LazyReadContext {
82+
// all conjuncts: in sql, join runtime filter, topn runtime filter.
8283
VExprContextSPtrs conjuncts;
84+
85+
// ParquetReader::set_fill_columns(xxx, xxx) will set these two members
86+
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
87+
fill_partition_columns;
88+
std::unordered_map<std::string, VExprContextSPtr> fill_missing_columns;
89+
8390
bool can_lazy_read = false;
8491
// block->rows() returns the number of rows of the first column,
8592
// so we should check and resize the first column

be/src/vec/exec/format/parquet/vparquet_reader.cpp

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -388,11 +388,16 @@ bool ParquetReader::_type_matches(const VSlotRef* slot_ref) const {
388388
!is_complex_type(table_col_type->get_primitive_type());
389389
}
390390

391-
Status ParquetReader::set_fill_columns(
392-
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
393-
partition_columns,
394-
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
395-
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
391+
Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts) {
392+
RowGroupReader::LazyReadContext new_lazy_read_ctx;
393+
new_lazy_read_ctx.conjuncts = new_conjuncts;
394+
new_lazy_read_ctx.fill_partition_columns = std::move(_lazy_read_ctx.fill_partition_columns);
395+
new_lazy_read_ctx.fill_missing_columns = std::move(_lazy_read_ctx.fill_missing_columns);
396+
_lazy_read_ctx = std::move(new_lazy_read_ctx);
397+
398+
_top_runtime_vexprs.clear();
399+
_push_down_predicates.clear();
400+
396401
// std::unordered_map<column_name, std::pair<col_id, slot_id>>
397402
std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_columns;
398403
// visit_slot for lazy mat.
@@ -499,7 +504,7 @@ Status ParquetReader::set_fill_columns(
499504
_lazy_read_ctx.all_predicate_col_ids.emplace_back(_row_id_column_iterator_pair.second);
500505
}
501506

502-
for (auto& kv : partition_columns) {
507+
for (auto& kv : _lazy_read_ctx.fill_partition_columns) {
503508
auto iter = predicate_columns.find(kv.first);
504509
if (iter == predicate_columns.end()) {
505510
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
@@ -509,7 +514,7 @@ Status ParquetReader::set_fill_columns(
509514
}
510515
}
511516

512-
for (auto& kv : missing_columns) {
517+
for (auto& kv : _lazy_read_ctx.fill_missing_columns) {
513518
auto iter = predicate_columns.find(kv.first);
514519
if (iter == predicate_columns.end()) {
515520
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
@@ -541,6 +546,17 @@ Status ParquetReader::set_fill_columns(
541546
}
542547
}
543548

549+
return Status::OK();
550+
}
551+
552+
Status ParquetReader::set_fill_columns(
553+
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
554+
partition_columns,
555+
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
556+
_lazy_read_ctx.fill_partition_columns = partition_columns;
557+
_lazy_read_ctx.fill_missing_columns = missing_columns;
558+
RETURN_IF_ERROR(_update_lazy_read_ctx(_lazy_read_ctx.conjuncts));
559+
544560
if (_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) {
545561
return Status::EndOfFile("No row group to read");
546562
}
@@ -678,6 +694,13 @@ Status ParquetReader::_next_row_group_reader() {
678694
continue;
679695
}
680696

697+
bool has_late_rf_cond = false;
698+
VExprContextSPtrs new_push_down_conjuncts;
699+
RETURN_IF_ERROR(_call_late_rf_func(&has_late_rf_cond, new_push_down_conjuncts));
700+
if (has_late_rf_cond) {
701+
RETURN_IF_ERROR(_update_lazy_read_ctx(new_push_down_conjuncts));
702+
}
703+
681704
size_t before_predicate_size = _push_down_predicates.size();
682705
_push_down_predicates.reserve(before_predicate_size + _top_runtime_vexprs.size());
683706
for (const auto& vexpr : _top_runtime_vexprs) {

be/src/vec/exec/format/parquet/vparquet_reader.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
162162

163163
bool count_read_rows() override { return true; }
164164

165+
void set_update_late_rf_func(std::function<Status(bool*, VExprContextSPtrs&)>&& func) {
166+
_call_late_rf_func = std::move(func);
167+
}
168+
165169
protected:
166170
void _collect_profile_before_close() override;
167171

@@ -254,6 +258,9 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
254258
bool _exists_in_file(const VSlotRef* slot) const override;
255259
bool _type_matches(const VSlotRef*) const override;
256260

261+
// update lazy read context when runtime filter changed
262+
Status _update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts);
263+
257264
RuntimeProfile* _profile = nullptr;
258265
const TFileScanRangeParams& _scan_params;
259266
const TFileRangeDesc& _scan_range;
@@ -341,6 +348,15 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
341348
std::vector<std::unique_ptr<MutilColumnBlockPredicate>> _push_down_predicates;
342349
std::vector<std::unique_ptr<ColumnPredicate>> _useless_predicates;
343350
Arena _arena;
351+
352+
// when creating a new row group reader, call this function to get the latest runtime filter conjuncts.
353+
// The default implementation does nothing, sets 'changed' to false, and returns OK.
354+
// This is used when iceberg read position delete file ...
355+
static Status default_late_rf_func(bool* changed, VExprContextSPtrs&) {
356+
*changed = false;
357+
return Status::OK();
358+
}
359+
std::function<Status(bool*, VExprContextSPtrs&)> _call_late_rf_func = default_late_rf_func;
344360
};
345361
#include "common/compile_check_end.h"
346362

be/src/vec/exec/scan/file_scanner.cpp

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -359,15 +359,19 @@ Status FileScanner::_process_conjuncts() {
359359
return Status::OK();
360360
}
361361

362-
Status FileScanner::_process_late_arrival_conjuncts() {
362+
Status FileScanner::_process_late_arrival_conjuncts(bool* changed,
363+
VExprContextSPtrs& new_push_down_conjuncts) {
364+
*changed = false;
363365
if (_push_down_conjuncts.size() < _conjuncts.size()) {
366+
*changed = true;
364367
_push_down_conjuncts.clear();
365368
_push_down_conjuncts.resize(_conjuncts.size());
366369
for (size_t i = 0; i != _conjuncts.size(); ++i) {
367370
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
368371
}
369372
RETURN_IF_ERROR(_process_conjuncts());
370373
_discard_conjuncts();
374+
new_push_down_conjuncts = _push_down_conjuncts;
371375
}
372376
if (_applied_rf_num == _total_rf_num) {
373377
_local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
@@ -1057,9 +1061,17 @@ Status FileScanner::_get_next_reader() {
10571061
// ATTN: the push down agg type may be set back to NONE,
10581062
// see IcebergTableReader::init_row_filters for example.
10591063
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
1060-
if (push_down_predicates) {
1061-
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1062-
}
1064+
1065+
std::function<Status(bool*, VExprContextSPtrs&)> update_late_rf =
1066+
[&](bool* changed, VExprContextSPtrs& new_push_down_conjuncts) -> Status {
1067+
if (!_is_load) {
1068+
RETURN_IF_ERROR(try_append_late_arrival_runtime_filter());
1069+
RETURN_IF_ERROR(
1070+
_process_late_arrival_conjuncts(changed, new_push_down_conjuncts));
1071+
}
1072+
return Status::OK();
1073+
};
1074+
parquet_reader->set_update_late_rf_func(std::move(update_late_rf));
10631075
RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
10641076

10651077
need_to_get_parsed_schema = true;
@@ -1080,7 +1092,9 @@ Status FileScanner::_get_next_reader() {
10801092

10811093
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
10821094
if (push_down_predicates) {
1083-
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1095+
bool changed = false;
1096+
VExprContextSPtrs new_push_down_conjuncts;
1097+
RETURN_IF_ERROR(_process_late_arrival_conjuncts(&changed, new_push_down_conjuncts));
10841098
}
10851099
RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
10861100

be/src/vec/exec/scan/file_scanner.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ class FileScanner : public Scanner {
253253
void _init_runtime_filter_partition_prune_block();
254254
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
255255
Status _process_conjuncts();
256-
Status _process_late_arrival_conjuncts();
256+
Status _process_late_arrival_conjuncts(bool* changed,
257+
VExprContextSPtrs& new_push_down_conjuncts);
257258
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
258259
Status _generate_truncate_columns(bool need_to_get_parsed_schema);
259260
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);

be/src/vec/exec/scan/scanner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Scanner::Scanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
4141
_output_tuple_desc(_local_state->output_tuple_desc()),
4242
_output_row_descriptor(_local_state->_parent->output_row_descriptor()),
4343
_has_prepared(false) {
44+
_total_rf_num = cast_set<int>(_local_state->_helper.runtime_filter_nums());
4445
DorisMetrics::instance()->scanner_cnt->increment(1);
4546
}
4647

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use `default`;
2+
3+
create table fact_big (
4+
k INT,
5+
c1 INT,
6+
c2 BIGINT,
7+
c3 DOUBLE,
8+
c4 STRING
9+
)stored as parquet
10+
LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_fact_big';
11+
12+
create table dim_small (
13+
k INT,
14+
c1 INT,
15+
c2 BIGINT
16+
)stored as parquet
17+
LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_dim_small';
18+
19+
msck repair table fact_big;
20+
msck repair table dim_small;

0 commit comments

Comments
 (0)