Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/runtime_filter/runtime_filter_consumer_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class RuntimeFilterConsumerHelper {
// parent_operator_profile is owned by LocalState so update it is safe at here.
void collect_realtime_profile(RuntimeProfile* parent_operator_profile);

size_t runtime_filter_nums() const { return _runtime_filter_descs.size(); }

private:
// Append late-arrival runtime filters to the vconjunct_ctx.
Status _append_rf_into_conjuncts(RuntimeState* state,
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,14 @@ class RowGroupReader : public ProfileCollector {

// table name
struct LazyReadContext {
// all conjuncts: in sql, join runtime filter, topn runtime filter.
VExprContextSPtrs conjuncts;

// ParquetReader::set_fill_columns(xxx, xxx) will set these two members
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
fill_partition_columns;
std::unordered_map<std::string, VExprContextSPtr> fill_missing_columns;

bool can_lazy_read = false;
// block->rows() returns the number of rows of the first column,
// so we should check and resize the first column
Expand Down
37 changes: 30 additions & 7 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,16 @@ bool ParquetReader::_type_matches(const VSlotRef* slot_ref) const {
!is_complex_type(table_col_type->get_primitive_type());
}

Status ParquetReader::set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts) {
RowGroupReader::LazyReadContext new_lazy_read_ctx;
new_lazy_read_ctx.conjuncts = new_conjuncts;
new_lazy_read_ctx.fill_partition_columns = std::move(_lazy_read_ctx.fill_partition_columns);
new_lazy_read_ctx.fill_missing_columns = std::move(_lazy_read_ctx.fill_missing_columns);
_lazy_read_ctx = std::move(new_lazy_read_ctx);

_top_runtime_vexprs.clear();
_push_down_predicates.clear();

// std::unordered_map<column_name, std::pair<col_id, slot_id>>
std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_columns;
// visit_slot for lazy mat.
Expand Down Expand Up @@ -499,7 +504,7 @@ Status ParquetReader::set_fill_columns(
_lazy_read_ctx.all_predicate_col_ids.emplace_back(_row_id_column_iterator_pair.second);
}

for (auto& kv : partition_columns) {
for (auto& kv : _lazy_read_ctx.fill_partition_columns) {
auto iter = predicate_columns.find(kv.first);
if (iter == predicate_columns.end()) {
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
Expand All @@ -509,7 +514,7 @@ Status ParquetReader::set_fill_columns(
}
}

for (auto& kv : missing_columns) {
for (auto& kv : _lazy_read_ctx.fill_missing_columns) {
auto iter = predicate_columns.find(kv.first);
if (iter == predicate_columns.end()) {
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
Expand Down Expand Up @@ -541,6 +546,17 @@ Status ParquetReader::set_fill_columns(
}
}

return Status::OK();
}

Status ParquetReader::set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
_lazy_read_ctx.fill_partition_columns = partition_columns;
_lazy_read_ctx.fill_missing_columns = missing_columns;
RETURN_IF_ERROR(_update_lazy_read_ctx(_lazy_read_ctx.conjuncts));

if (_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) {
return Status::EndOfFile("No row group to read");
}
Expand Down Expand Up @@ -678,6 +694,13 @@ Status ParquetReader::_next_row_group_reader() {
continue;
}

bool has_late_rf_cond = false;
VExprContextSPtrs new_push_down_conjuncts;
RETURN_IF_ERROR(_call_late_rf_func(&has_late_rf_cond, new_push_down_conjuncts));
if (has_late_rf_cond) {
RETURN_IF_ERROR(_update_lazy_read_ctx(new_push_down_conjuncts));
}

size_t before_predicate_size = _push_down_predicates.size();
_push_down_predicates.reserve(before_predicate_size + _top_runtime_vexprs.size());
for (const auto& vexpr : _top_runtime_vexprs) {
Expand Down
16 changes: 16 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {

bool count_read_rows() override { return true; }

void set_update_late_rf_func(std::function<Status(bool*, VExprContextSPtrs&)>&& func) {
_call_late_rf_func = std::move(func);
}

protected:
void _collect_profile_before_close() override;

Expand Down Expand Up @@ -254,6 +258,9 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
bool _exists_in_file(const VSlotRef* slot) const override;
bool _type_matches(const VSlotRef*) const override;

// update lazy read context when runtime filter changed
Status _update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts);

RuntimeProfile* _profile = nullptr;
const TFileScanRangeParams& _scan_params;
const TFileRangeDesc& _scan_range;
Expand Down Expand Up @@ -341,6 +348,15 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
std::vector<std::unique_ptr<MutilColumnBlockPredicate>> _push_down_predicates;
std::vector<std::unique_ptr<ColumnPredicate>> _useless_predicates;
Arena _arena;

// when creating a new row group reader, call this function to get the latest runtime filter conjuncts.
// The default implementation does nothing, sets 'changed' to false, and returns OK.
// This is used when iceberg read position delete file ...
static Status default_late_rf_func(bool* changed, VExprContextSPtrs&) {
*changed = false;
return Status::OK();
}
std::function<Status(bool*, VExprContextSPtrs&)> _call_late_rf_func = default_late_rf_func;
};
#include "common/compile_check_end.h"

Expand Down
24 changes: 19 additions & 5 deletions be/src/vec/exec/scan/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,19 @@ Status FileScanner::_process_conjuncts() {
return Status::OK();
}

Status FileScanner::_process_late_arrival_conjuncts() {
Status FileScanner::_process_late_arrival_conjuncts(bool* changed,
VExprContextSPtrs& new_push_down_conjuncts) {
*changed = false;
if (_push_down_conjuncts.size() < _conjuncts.size()) {
*changed = true;
_push_down_conjuncts.clear();
_push_down_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
}
RETURN_IF_ERROR(_process_conjuncts());
_discard_conjuncts();
new_push_down_conjuncts = _push_down_conjuncts;
}
if (_applied_rf_num == _total_rf_num) {
_local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
Expand Down Expand Up @@ -1057,9 +1061,17 @@ Status FileScanner::_get_next_reader() {
// ATTN: the push down agg type may be set back to NONE,
// see IcebergTableReader::init_row_filters for example.
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}

std::function<Status(bool*, VExprContextSPtrs&)> update_late_rf =
[&](bool* changed, VExprContextSPtrs& new_push_down_conjuncts) -> Status {
if (!_is_load) {
RETURN_IF_ERROR(try_append_late_arrival_runtime_filter());
RETURN_IF_ERROR(
_process_late_arrival_conjuncts(changed, new_push_down_conjuncts));
}
return Status::OK();
};
parquet_reader->set_update_late_rf_func(std::move(update_late_rf));
RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));

need_to_get_parsed_schema = true;
Expand All @@ -1080,7 +1092,9 @@ Status FileScanner::_get_next_reader() {

orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
bool changed = false;
VExprContextSPtrs new_push_down_conjuncts;
RETURN_IF_ERROR(_process_late_arrival_conjuncts(&changed, new_push_down_conjuncts));
}
RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/scan/file_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ class FileScanner : public Scanner {
void _init_runtime_filter_partition_prune_block();
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
Status _process_conjuncts();
Status _process_late_arrival_conjuncts();
Status _process_late_arrival_conjuncts(bool* changed,
VExprContextSPtrs& new_push_down_conjuncts);
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
Status _generate_truncate_columns(bool need_to_get_parsed_schema);
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Scanner::Scanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
_output_tuple_desc(_local_state->output_tuple_desc()),
_output_row_descriptor(_local_state->_parent->output_row_descriptor()),
_has_prepared(false) {
_total_rf_num = cast_set<int>(_local_state->_helper.runtime_filter_nums());
DorisMetrics::instance()->scanner_cnt->increment(1);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use `default`;

create table fact_big (
k INT,
c1 INT,
c2 BIGINT,
c3 DOUBLE,
c4 STRING
)stored as parquet
LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_fact_big';

create table dim_small (
k INT,
c1 INT,
c2 BIGINT
)stored as parquet
LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_dim_small';

msck repair table fact_big;
msck repair table dim_small;
Binary file not shown.
Binary file not shown.
Loading
Loading