Skip to content

Commit 2937fe0

Browse files
committed
fix pipeline
1 parent 262b16e commit 2937fe0

File tree

11 files changed

+272
-13
lines changed

11 files changed

+272
-13
lines changed

be/src/runtime_filter/runtime_filter_consumer_helper.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class RuntimeFilterConsumerHelper {
5252
// parent_operator_profile is owned by LocalState so update it is safe at here.
5353
void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
5454

55+
size_t runtime_filter_nums() const { return _runtime_filter_descs.size(); }
56+
5557
private:
5658
// Append late-arrival runtime filters to the vconjunct_ctx.
5759
Status _append_rf_into_conjuncts(RuntimeState* state,

be/src/vec/exec/format/parquet/vparquet_group_reader.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,14 @@ class RowGroupReader : public ProfileCollector {
7979

8080
// table name
8181
struct LazyReadContext {
82+
// all conjuncts: in sql, join runtime filter, topn runtime filter.
8283
VExprContextSPtrs conjuncts;
84+
85+
// ParquetReader::set_fill_columns(xxx, xxx) will set these two members
86+
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
87+
fill_partition_columns;
88+
std::unordered_map<std::string, VExprContextSPtr> fill_missing_columns;
89+
8390
bool can_lazy_read = false;
8491
// block->rows() returns the number of rows of the first column,
8592
// so we should check and resize the first column

be/src/vec/exec/format/parquet/vparquet_reader.cpp

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -405,11 +405,17 @@ bool ParquetReader::_type_matches(const VSlotRef* slot_ref) const {
405405
!is_complex_type(table_col_type->get_primitive_type());
406406
}
407407

408-
Status ParquetReader::set_fill_columns(
409-
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
410-
partition_columns,
411-
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
412-
SCOPED_RAW_TIMER(&_reader_statistics.parse_meta_time);
408+
Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts) {
409+
RowGroupReader::LazyReadContext new_lazy_read_ctx;
410+
new_lazy_read_ctx.conjuncts = new_conjuncts;
411+
new_lazy_read_ctx.fill_partition_columns = std::move(_lazy_read_ctx.fill_partition_columns);
412+
new_lazy_read_ctx.fill_missing_columns = std::move(_lazy_read_ctx.fill_missing_columns);
413+
_lazy_read_ctx = std::move(new_lazy_read_ctx);
414+
415+
_top_runtime_vexprs.clear();
416+
_push_down_predicates.clear();
417+
_useless_predicates.clear();
418+
413419
// std::unordered_map<column_name, std::pair<col_id, slot_id>>
414420
std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_columns;
415421
// visit_slot for lazy mat.
@@ -516,7 +522,7 @@ Status ParquetReader::set_fill_columns(
516522
_lazy_read_ctx.all_predicate_col_ids.emplace_back(_row_id_column_iterator_pair.second);
517523
}
518524

519-
for (auto& kv : partition_columns) {
525+
for (auto& kv : _lazy_read_ctx.fill_partition_columns) {
520526
auto iter = predicate_columns.find(kv.first);
521527
if (iter == predicate_columns.end()) {
522528
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
@@ -526,7 +532,7 @@ Status ParquetReader::set_fill_columns(
526532
}
527533
}
528534

529-
for (auto& kv : missing_columns) {
535+
for (auto& kv : _lazy_read_ctx.fill_missing_columns) {
530536
auto iter = predicate_columns.find(kv.first);
531537
if (iter == predicate_columns.end()) {
532538
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
@@ -558,6 +564,17 @@ Status ParquetReader::set_fill_columns(
558564
}
559565
}
560566

567+
return Status::OK();
568+
}
569+
570+
Status ParquetReader::set_fill_columns(
571+
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
572+
partition_columns,
573+
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
574+
_lazy_read_ctx.fill_partition_columns = partition_columns;
575+
_lazy_read_ctx.fill_missing_columns = missing_columns;
576+
RETURN_IF_ERROR(_update_lazy_read_ctx(_lazy_read_ctx.conjuncts));
577+
561578
if (_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) {
562579
return Status::EndOfFile("No row group to read");
563580
}
@@ -697,6 +714,13 @@ Status ParquetReader::_next_row_group_reader() {
697714
continue;
698715
}
699716

717+
bool has_late_rf_cond = false;
718+
VExprContextSPtrs new_push_down_conjuncts;
719+
RETURN_IF_ERROR(_call_late_rf_func(&has_late_rf_cond, new_push_down_conjuncts));
720+
if (has_late_rf_cond) {
721+
RETURN_IF_ERROR(_update_lazy_read_ctx(new_push_down_conjuncts));
722+
}
723+
700724
size_t before_predicate_size = _push_down_predicates.size();
701725
_push_down_predicates.reserve(before_predicate_size + _top_runtime_vexprs.size());
702726
for (const auto& vexpr : _top_runtime_vexprs) {

be/src/vec/exec/format/parquet/vparquet_reader.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
164164

165165
bool count_read_rows() override { return true; }
166166

167+
void set_update_late_rf_func(std::function<Status(bool*, VExprContextSPtrs&)>&& func) {
168+
_call_late_rf_func = std::move(func);
169+
}
170+
167171
protected:
168172
void _collect_profile_before_close() override;
169173

@@ -256,6 +260,9 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
256260
bool _exists_in_file(const VSlotRef* slot) const override;
257261
bool _type_matches(const VSlotRef*) const override;
258262

263+
// update lazy read context when runtime filter changed
264+
Status _update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts);
265+
259266
RuntimeProfile* _profile = nullptr;
260267
const TFileScanRangeParams& _scan_params;
261268
const TFileRangeDesc& _scan_range;
@@ -345,6 +352,15 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
345352
std::vector<std::unique_ptr<MutilColumnBlockPredicate>> _push_down_predicates;
346353
std::vector<std::shared_ptr<ColumnPredicate>> _useless_predicates;
347354
Arena _arena;
355+
356+
// when creating a new row group reader, call this function to get the latest runtime filter conjuncts.
357+
// The default implementation does nothing, sets 'changed' to false, and returns OK.
358+
// This is used when iceberg read position delete file ...
359+
static Status default_late_rf_func(bool* changed, VExprContextSPtrs&) {
360+
*changed = false;
361+
return Status::OK();
362+
}
363+
std::function<Status(bool*, VExprContextSPtrs&)> _call_late_rf_func = default_late_rf_func;
348364
};
349365
#include "common/compile_check_end.h"
350366

be/src/vec/exec/scan/file_scanner.cpp

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -354,15 +354,19 @@ Status FileScanner::_process_conjuncts() {
354354
return Status::OK();
355355
}
356356

357-
Status FileScanner::_process_late_arrival_conjuncts() {
357+
Status FileScanner::_process_late_arrival_conjuncts(bool* changed,
358+
VExprContextSPtrs& new_push_down_conjuncts) {
359+
*changed = false;
358360
if (_push_down_conjuncts.size() < _conjuncts.size()) {
361+
*changed = true;
359362
_push_down_conjuncts.clear();
360363
_push_down_conjuncts.resize(_conjuncts.size());
361364
for (size_t i = 0; i != _conjuncts.size(); ++i) {
362365
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
363366
}
364367
RETURN_IF_ERROR(_process_conjuncts());
365368
_discard_conjuncts();
369+
new_push_down_conjuncts = _push_down_conjuncts;
366370
}
367371
if (_applied_rf_num == _total_rf_num) {
368372
_local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
@@ -1048,9 +1052,17 @@ Status FileScanner::_get_next_reader() {
10481052
// ATTN: the push down agg type may be set back to NONE,
10491053
// see IcebergTableReader::init_row_filters for example.
10501054
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
1051-
if (push_down_predicates) {
1052-
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1053-
}
1055+
1056+
std::function<Status(bool*, VExprContextSPtrs&)> update_late_rf =
1057+
[&](bool* changed, VExprContextSPtrs& new_push_down_conjuncts) -> Status {
1058+
if (!_is_load) {
1059+
RETURN_IF_ERROR(try_append_late_arrival_runtime_filter());
1060+
RETURN_IF_ERROR(
1061+
_process_late_arrival_conjuncts(changed, new_push_down_conjuncts));
1062+
}
1063+
return Status::OK();
1064+
};
1065+
parquet_reader->set_update_late_rf_func(std::move(update_late_rf));
10541066
RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
10551067

10561068
need_to_get_parsed_schema = true;
@@ -1071,7 +1083,9 @@ Status FileScanner::_get_next_reader() {
10711083

10721084
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
10731085
if (push_down_predicates) {
1074-
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1086+
bool changed = false;
1087+
VExprContextSPtrs new_push_down_conjuncts;
1088+
RETURN_IF_ERROR(_process_late_arrival_conjuncts(&changed, new_push_down_conjuncts));
10751089
}
10761090
RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
10771091

be/src/vec/exec/scan/file_scanner.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ class FileScanner : public Scanner {
253253
void _init_runtime_filter_partition_prune_block();
254254
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
255255
Status _process_conjuncts();
256-
Status _process_late_arrival_conjuncts();
256+
Status _process_late_arrival_conjuncts(bool* changed,
257+
VExprContextSPtrs& new_push_down_conjuncts);
257258
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
258259
Status _generate_truncate_columns(bool need_to_get_parsed_schema);
259260
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);

be/src/vec/exec/scan/scanner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Scanner::Scanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
4141
_output_tuple_desc(_local_state->output_tuple_desc()),
4242
_output_row_descriptor(_local_state->_parent->output_row_descriptor()),
4343
_has_prepared(false) {
44+
_total_rf_num = cast_set<int>(_local_state->_helper.runtime_filter_nums());
4445
DorisMetrics::instance()->scanner_cnt->increment(1);
4546
}
4647

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use `default`;
2+
3+
create table fact_big (
4+
k INT,
5+
c1 INT,
6+
c2 BIGINT,
7+
c3 DOUBLE,
8+
c4 STRING
9+
)stored as parquet
10+
LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_fact_big';
11+
12+
create table dim_small (
13+
k INT,
14+
c1 INT,
15+
c2 BIGINT
16+
)stored as parquet
17+
LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_dim_small';
18+
19+
msck repair table fact_big;
20+
msck repair table dim_small;

0 commit comments

Comments
 (0)