Skip to content

Commit fcacd4c

Browse files
committed
fix pipeline
1 parent 43f2d40 commit fcacd4c

File tree

11 files changed

+241
-13
lines changed

11 files changed

+241
-13
lines changed

be/src/runtime_filter/runtime_filter_consumer_helper.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class RuntimeFilterConsumerHelper {
5252
// parent_operator_profile is owned by LocalState so update it is safe at here.
5353
void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
5454

55+
size_t runtime_filter_nums() const { return _runtime_filter_descs.size(); }
56+
5557
private:
5658
// Append late-arrival runtime filters to the vconjunct_ctx.
5759
Status _append_rf_into_conjuncts(RuntimeState* state,

be/src/vec/exec/format/parquet/vparquet_group_reader.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,14 @@ class RowGroupReader : public ProfileCollector {
7979

8080
// table name
8181
struct LazyReadContext {
82+
// all conjuncts: in sql, join runtime filter, topn runtime filter.
8283
VExprContextSPtrs conjuncts;
84+
85+
// ParquetReader::set_fill_columns(xxx, xxx) will set these two members
86+
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
87+
fill_partition_columns;
88+
std::unordered_map<std::string, VExprContextSPtr> fill_missing_columns;
89+
8390
bool can_lazy_read = false;
8491
// block->rows() returns the number of rows of the first column,
8592
// so we should check and resize the first column

be/src/vec/exec/format/parquet/vparquet_reader.cpp

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -408,11 +408,17 @@ bool ParquetReader::_type_matches(const VSlotRef* slot_ref) const {
408408
!is_complex_type(table_col_type->get_primitive_type());
409409
}
410410

411-
Status ParquetReader::set_fill_columns(
412-
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
413-
partition_columns,
414-
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
415-
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
411+
Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts) {
412+
RowGroupReader::LazyReadContext new_lazy_read_ctx;
413+
new_lazy_read_ctx.conjuncts = new_conjuncts;
414+
new_lazy_read_ctx.fill_partition_columns = std::move(_lazy_read_ctx.fill_partition_columns);
415+
new_lazy_read_ctx.fill_missing_columns = std::move(_lazy_read_ctx.fill_missing_columns);
416+
_lazy_read_ctx = std::move(new_lazy_read_ctx);
417+
418+
_top_runtime_vexprs.clear();
419+
_push_down_predicates.clear();
420+
_useless_predicates.clear();
421+
416422
// std::unordered_map<column_name, std::pair<col_id, slot_id>>
417423
std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_columns;
418424
// visit_slot for lazy mat.
@@ -519,7 +525,7 @@ Status ParquetReader::set_fill_columns(
519525
_lazy_read_ctx.all_predicate_col_ids.emplace_back(_row_id_column_iterator_pair.second);
520526
}
521527

522-
for (auto& kv : partition_columns) {
528+
for (auto& kv : _lazy_read_ctx.fill_partition_columns) {
523529
auto iter = predicate_columns.find(kv.first);
524530
if (iter == predicate_columns.end()) {
525531
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
@@ -529,7 +535,7 @@ Status ParquetReader::set_fill_columns(
529535
}
530536
}
531537

532-
for (auto& kv : missing_columns) {
538+
for (auto& kv : _lazy_read_ctx.fill_missing_columns) {
533539
auto iter = predicate_columns.find(kv.first);
534540
if (iter == predicate_columns.end()) {
535541
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
@@ -561,6 +567,17 @@ Status ParquetReader::set_fill_columns(
561567
}
562568
}
563569

570+
return Status::OK();
571+
}
572+
573+
Status ParquetReader::set_fill_columns(
574+
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
575+
partition_columns,
576+
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
577+
_lazy_read_ctx.fill_partition_columns = partition_columns;
578+
_lazy_read_ctx.fill_missing_columns = missing_columns;
579+
RETURN_IF_ERROR(_update_lazy_read_ctx(_lazy_read_ctx.conjuncts));
580+
564581
if (_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) {
565582
return Status::EndOfFile("No row group to read");
566583
}
@@ -698,6 +715,13 @@ Status ParquetReader::_next_row_group_reader() {
698715
continue;
699716
}
700717

718+
bool has_late_rf_cond = false;
719+
VExprContextSPtrs new_push_down_conjuncts;
720+
RETURN_IF_ERROR(_call_late_rf_func(&has_late_rf_cond, new_push_down_conjuncts));
721+
if (has_late_rf_cond) {
722+
RETURN_IF_ERROR(_update_lazy_read_ctx(new_push_down_conjuncts));
723+
}
724+
701725
size_t before_predicate_size = _push_down_predicates.size();
702726
_push_down_predicates.reserve(before_predicate_size + _top_runtime_vexprs.size());
703727
for (const auto& vexpr : _top_runtime_vexprs) {

be/src/vec/exec/format/parquet/vparquet_reader.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
165165

166166
bool count_read_rows() override { return true; }
167167

168+
void set_update_late_rf_func(std::function<Status(bool*, VExprContextSPtrs&)>&& func) {
169+
_call_late_rf_func = std::move(func);
170+
}
171+
168172
protected:
169173
void _collect_profile_before_close() override;
170174

@@ -256,6 +260,9 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
256260
bool _exists_in_file(const VSlotRef* slot) const override;
257261
bool _type_matches(const VSlotRef*) const override;
258262

263+
// update lazy read context when runtime filter changed
264+
Status _update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts);
265+
259266
RuntimeProfile* _profile = nullptr;
260267
const TFileScanRangeParams& _scan_params;
261268
const TFileRangeDesc& _scan_range;
@@ -345,6 +352,13 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
345352
std::vector<std::unique_ptr<MutilColumnBlockPredicate>> _push_down_predicates;
346353
std::vector<std::shared_ptr<ColumnPredicate>> _useless_predicates;
347354
Arena _arena;
355+
356+
// when create new row group reader, call this function to get lasted runtime filter conjuncts.
357+
std::function<Status(bool*, VExprContextSPtrs&)> _call_late_rf_func = [](bool* changed,
358+
VExprContextSPtrs&) {
359+
*changed = false;
360+
return Status::OK();
361+
};
348362
};
349363
#include "common/compile_check_end.h"
350364

be/src/vec/exec/scan/file_scanner.cpp

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -357,15 +357,19 @@ Status FileScanner::_process_conjuncts() {
357357
return Status::OK();
358358
}
359359

360-
Status FileScanner::_process_late_arrival_conjuncts() {
360+
Status FileScanner::_process_late_arrival_conjuncts(bool* changed,
361+
VExprContextSPtrs& new_push_down_conjuncts) {
362+
*changed = false;
361363
if (_push_down_conjuncts.size() < _conjuncts.size()) {
364+
*changed = true;
362365
_push_down_conjuncts.clear();
363366
_push_down_conjuncts.resize(_conjuncts.size());
364367
for (size_t i = 0; i != _conjuncts.size(); ++i) {
365368
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
366369
}
367370
RETURN_IF_ERROR(_process_conjuncts());
368371
_discard_conjuncts();
372+
new_push_down_conjuncts = _push_down_conjuncts;
369373
}
370374
if (_applied_rf_num == _total_rf_num) {
371375
_local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
@@ -1051,9 +1055,17 @@ Status FileScanner::_get_next_reader() {
10511055
// ATTN: the push down agg type may be set back to NONE,
10521056
// see IcebergTableReader::init_row_filters for example.
10531057
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
1054-
if (push_down_predicates) {
1055-
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1056-
}
1058+
1059+
std::function<Status(bool*, VExprContextSPtrs&)> update_late_rf =
1060+
[&](bool* changed, VExprContextSPtrs& new_push_down_conjuncts) -> Status {
1061+
if (!_is_load) {
1062+
RETURN_IF_ERROR(try_append_late_arrival_runtime_filter());
1063+
RETURN_IF_ERROR(
1064+
_process_late_arrival_conjuncts(changed, new_push_down_conjuncts));
1065+
}
1066+
return Status::OK();
1067+
};
1068+
parquet_reader->set_update_late_rf_func(std::move(update_late_rf));
10571069
RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
10581070

10591071
need_to_get_parsed_schema = true;
@@ -1074,7 +1086,9 @@ Status FileScanner::_get_next_reader() {
10741086

10751087
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
10761088
if (push_down_predicates) {
1077-
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1089+
bool changed = false;
1090+
VExprContextSPtrs new_push_down_conjuncts;
1091+
RETURN_IF_ERROR(_process_late_arrival_conjuncts(&changed, new_push_down_conjuncts));
10781092
}
10791093
RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
10801094

be/src/vec/exec/scan/file_scanner.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,8 @@ class FileScanner : public Scanner {
251251
void _init_runtime_filter_partition_prune_block();
252252
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
253253
Status _process_conjuncts();
254-
Status _process_late_arrival_conjuncts();
254+
Status _process_late_arrival_conjuncts(bool* changed,
255+
VExprContextSPtrs& new_push_down_conjuncts);
255256
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
256257
Status _generate_truncate_columns(bool need_to_get_parsed_schema);
257258
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);

be/src/vec/exec/scan/scanner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Scanner::Scanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
4141
_output_tuple_desc(_local_state->output_tuple_desc()),
4242
_output_row_descriptor(_local_state->_parent->output_row_descriptor()),
4343
_has_prepared(false) {
44+
_total_rf_num = cast_set<int>(_local_state->_helper.runtime_filter_nums());
4445
DorisMetrics::instance()->scanner_cnt->increment(1);
4546
}
4647

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use `default`;
2+
3+
create table fact_big (
4+
k INT,
5+
c1 INT,
6+
c2 BIGINT,
7+
c3 DOUBLE,
8+
c4 STRING
9+
)stored as parquet
10+
LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_fact_big';
11+
12+
create table dim_small (
13+
k INT,
14+
c1 INT,
15+
c2 BIGINT
16+
)stored as parquet
17+
LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_dim_small';
18+
19+
msck repair table fact_big;
20+
msck repair table dim_small;

0 commit comments

Comments
 (0)