Skip to content

Commit 9b897fc

Browse files
hubgetermorningman
authored andcommitted
[Enhancement](parquet)update runtime filter when read next parquet row group.(#59053) (#59181)
bp #59053
1 parent ccaef26 commit 9b897fc

File tree

11 files changed

+272
-13
lines changed

11 files changed

+272
-13
lines changed

be/src/runtime_filter/runtime_filter_consumer_helper.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class RuntimeFilterConsumerHelper {
5252
// parent_operator_profile is owned by LocalState so update it is safe at here.
5353
void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
5454

55+
size_t runtime_filter_nums() const { return _runtime_filter_descs.size(); }
56+
5557
private:
5658
// Append late-arrival runtime filters to the vconjunct_ctx.
5759
Status _append_rf_into_conjuncts(RuntimeState* state,

be/src/vec/exec/format/parquet/vparquet_group_reader.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,14 @@ class RowGroupReader : public ProfileCollector {
7979

8080
// table name
8181
struct LazyReadContext {
82+
// all conjuncts: in sql, join runtime filter, topn runtime filter.
8283
VExprContextSPtrs conjuncts;
84+
85+
// ParquetReader::set_fill_columns(xxx, xxx) will set these two members
86+
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
87+
fill_partition_columns;
88+
std::unordered_map<std::string, VExprContextSPtr> fill_missing_columns;
89+
8390
bool can_lazy_read = false;
8491
// block->rows() returns the number of rows of the first column,
8592
// so we should check and resize the first column

be/src/vec/exec/format/parquet/vparquet_reader.cpp

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -383,11 +383,17 @@ bool ParquetReader::_type_matches(const VSlotRef* slot_ref) const {
383383
!is_complex_type(table_col_type->get_primitive_type());
384384
}
385385

386-
Status ParquetReader::set_fill_columns(
387-
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
388-
partition_columns,
389-
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
390-
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
386+
Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts) {
387+
RowGroupReader::LazyReadContext new_lazy_read_ctx;
388+
new_lazy_read_ctx.conjuncts = new_conjuncts;
389+
new_lazy_read_ctx.fill_partition_columns = std::move(_lazy_read_ctx.fill_partition_columns);
390+
new_lazy_read_ctx.fill_missing_columns = std::move(_lazy_read_ctx.fill_missing_columns);
391+
_lazy_read_ctx = std::move(new_lazy_read_ctx);
392+
393+
_top_runtime_vexprs.clear();
394+
_push_down_predicates.clear();
395+
_useless_predicates.clear();
396+
391397
// std::unordered_map<column_name, std::pair<col_id, slot_id>>
392398
std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_columns;
393399
// visit_slot for lazy mat.
@@ -494,7 +500,7 @@ Status ParquetReader::set_fill_columns(
494500
_lazy_read_ctx.all_predicate_col_ids.emplace_back(_row_id_column_iterator_pair.second);
495501
}
496502

497-
for (auto& kv : partition_columns) {
503+
for (auto& kv : _lazy_read_ctx.fill_partition_columns) {
498504
auto iter = predicate_columns.find(kv.first);
499505
if (iter == predicate_columns.end()) {
500506
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
@@ -504,7 +510,7 @@ Status ParquetReader::set_fill_columns(
504510
}
505511
}
506512

507-
for (auto& kv : missing_columns) {
513+
for (auto& kv : _lazy_read_ctx.fill_missing_columns) {
508514
auto iter = predicate_columns.find(kv.first);
509515
if (iter == predicate_columns.end()) {
510516
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
@@ -536,6 +542,17 @@ Status ParquetReader::set_fill_columns(
536542
}
537543
}
538544

545+
return Status::OK();
546+
}
547+
548+
Status ParquetReader::set_fill_columns(
549+
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
550+
partition_columns,
551+
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
552+
_lazy_read_ctx.fill_partition_columns = partition_columns;
553+
_lazy_read_ctx.fill_missing_columns = missing_columns;
554+
RETURN_IF_ERROR(_update_lazy_read_ctx(_lazy_read_ctx.conjuncts));
555+
539556
if (_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) {
540557
return Status::EndOfFile("No row group to read");
541558
}
@@ -673,6 +690,13 @@ Status ParquetReader::_next_row_group_reader() {
673690
continue;
674691
}
675692

693+
bool has_late_rf_cond = false;
694+
VExprContextSPtrs new_push_down_conjuncts;
695+
RETURN_IF_ERROR(_call_late_rf_func(&has_late_rf_cond, new_push_down_conjuncts));
696+
if (has_late_rf_cond) {
697+
RETURN_IF_ERROR(_update_lazy_read_ctx(new_push_down_conjuncts));
698+
}
699+
676700
size_t before_predicate_size = _push_down_predicates.size();
677701
_push_down_predicates.reserve(before_predicate_size + _top_runtime_vexprs.size());
678702
for (const auto& vexpr : _top_runtime_vexprs) {

be/src/vec/exec/format/parquet/vparquet_reader.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
160160

161161
bool count_read_rows() override { return true; }
162162

163+
void set_update_late_rf_func(std::function<Status(bool*, VExprContextSPtrs&)>&& func) {
164+
_call_late_rf_func = std::move(func);
165+
}
166+
163167
protected:
164168
void _collect_profile_before_close() override;
165169

@@ -252,6 +256,9 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
252256
bool _exists_in_file(const VSlotRef* slot) const override;
253257
bool _type_matches(const VSlotRef*) const override;
254258

259+
// update lazy read context when runtime filter changed
260+
Status _update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts);
261+
255262
RuntimeProfile* _profile = nullptr;
256263
const TFileScanRangeParams& _scan_params;
257264
const TFileRangeDesc& _scan_range;
@@ -337,6 +344,15 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
337344
std::vector<std::unique_ptr<MutilColumnBlockPredicate>> _push_down_predicates;
338345
std::vector<std::unique_ptr<ColumnPredicate>> _useless_predicates;
339346
Arena _arena;
347+
348+
// when creating a new row group reader, call this function to get the latest runtime filter conjuncts.
349+
// The default implementation does nothing, sets 'changed' to false, and returns OK.
350+
// This is used when iceberg read position delete file ...
351+
static Status default_late_rf_func(bool* changed, VExprContextSPtrs&) {
352+
*changed = false;
353+
return Status::OK();
354+
}
355+
std::function<Status(bool*, VExprContextSPtrs&)> _call_late_rf_func = default_late_rf_func;
340356
};
341357
#include "common/compile_check_end.h"
342358

be/src/vec/exec/scan/file_scanner.cpp

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -357,15 +357,19 @@ Status FileScanner::_process_conjuncts() {
357357
return Status::OK();
358358
}
359359

360-
Status FileScanner::_process_late_arrival_conjuncts() {
360+
Status FileScanner::_process_late_arrival_conjuncts(bool* changed,
361+
VExprContextSPtrs& new_push_down_conjuncts) {
362+
*changed = false;
361363
if (_push_down_conjuncts.size() < _conjuncts.size()) {
364+
*changed = true;
362365
_push_down_conjuncts.clear();
363366
_push_down_conjuncts.resize(_conjuncts.size());
364367
for (size_t i = 0; i != _conjuncts.size(); ++i) {
365368
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
366369
}
367370
RETURN_IF_ERROR(_process_conjuncts());
368371
_discard_conjuncts();
372+
new_push_down_conjuncts = _push_down_conjuncts;
369373
}
370374
if (_applied_rf_num == _total_rf_num) {
371375
_local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
@@ -1045,9 +1049,17 @@ Status FileScanner::_get_next_reader() {
10451049
// ATTN: the push down agg type may be set back to NONE,
10461050
// see IcebergTableReader::init_row_filters for example.
10471051
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
1048-
if (push_down_predicates) {
1049-
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1050-
}
1052+
1053+
std::function<Status(bool*, VExprContextSPtrs&)> update_late_rf =
1054+
[&](bool* changed, VExprContextSPtrs& new_push_down_conjuncts) -> Status {
1055+
if (!_is_load) {
1056+
RETURN_IF_ERROR(try_append_late_arrival_runtime_filter());
1057+
RETURN_IF_ERROR(
1058+
_process_late_arrival_conjuncts(changed, new_push_down_conjuncts));
1059+
}
1060+
return Status::OK();
1061+
};
1062+
parquet_reader->set_update_late_rf_func(std::move(update_late_rf));
10511063
RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
10521064

10531065
need_to_get_parsed_schema = true;
@@ -1068,7 +1080,9 @@ Status FileScanner::_get_next_reader() {
10681080

10691081
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
10701082
if (push_down_predicates) {
1071-
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1083+
bool changed = false;
1084+
VExprContextSPtrs new_push_down_conjuncts;
1085+
RETURN_IF_ERROR(_process_late_arrival_conjuncts(&changed, new_push_down_conjuncts));
10721086
}
10731087
RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
10741088

be/src/vec/exec/scan/file_scanner.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,8 @@ class FileScanner : public Scanner {
251251
void _init_runtime_filter_partition_prune_block();
252252
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
253253
Status _process_conjuncts();
254-
Status _process_late_arrival_conjuncts();
254+
Status _process_late_arrival_conjuncts(bool* changed,
255+
VExprContextSPtrs& new_push_down_conjuncts);
255256
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
256257
Status _generate_truncate_columns(bool need_to_get_parsed_schema);
257258
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);

be/src/vec/exec/scan/scanner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Scanner::Scanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
4141
_output_tuple_desc(_local_state->output_tuple_desc()),
4242
_output_row_descriptor(_local_state->_parent->output_row_descriptor()),
4343
_has_prepared(false) {
44+
_total_rf_num = cast_set<int>(_local_state->_helper.runtime_filter_nums());
4445
DorisMetrics::instance()->scanner_cnt->increment(1);
4546
}
4647

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use `default`;
2+
3+
create table fact_big (
4+
k INT,
5+
c1 INT,
6+
c2 BIGINT,
7+
c3 DOUBLE,
8+
c4 STRING
9+
)stored as parquet
10+
LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_fact_big';
11+
12+
create table dim_small (
13+
k INT,
14+
c1 INT,
15+
c2 BIGINT
16+
)stored as parquet
17+
LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_dim_small';
18+
19+
msck repair table fact_big;
20+
msck repair table dim_small;

0 commit comments

Comments
 (0)