Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/runtime_filter/runtime_filter_consumer_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class RuntimeFilterConsumerHelper {
// parent_operator_profile is owned by LocalState so update it is safe at here.
void collect_realtime_profile(RuntimeProfile* parent_operator_profile);

size_t runtime_filter_nums() const { return _runtime_filter_descs.size(); }

private:
// Append late-arrival runtime filters to the vconjunct_ctx.
Status _append_rf_into_conjuncts(RuntimeState* state,
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,14 @@ class RowGroupReader : public ProfileCollector {

// table name
struct LazyReadContext {
// all conjuncts: in sql, join runtime filter, topn runtime filter.
VExprContextSPtrs conjuncts;

// ParquetReader::set_fill_columns(xxx, xxx) will set these two members
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
fill_partition_columns;
std::unordered_map<std::string, VExprContextSPtr> fill_missing_columns;

bool can_lazy_read = false;
// block->rows() returns the number of rows of the first column,
// so we should check and resize the first column
Expand Down
37 changes: 30 additions & 7 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,16 @@ bool ParquetReader::_type_matches(const VSlotRef* slot_ref) const {
!is_complex_type(table_col_type->get_primitive_type());
}

Status ParquetReader::set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
SCOPED_RAW_TIMER(&_reader_statistics.parse_meta_time);
Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts) {
RowGroupReader::LazyReadContext new_lazy_read_ctx;
new_lazy_read_ctx.conjuncts = new_conjuncts;
new_lazy_read_ctx.fill_partition_columns = std::move(_lazy_read_ctx.fill_partition_columns);
new_lazy_read_ctx.fill_missing_columns = std::move(_lazy_read_ctx.fill_missing_columns);
Comment on lines +411 to +412
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving from _lazy_read_ctx and then assigning back to it on line 416 leaves the source in a moved-from state. Consider copying these values instead of moving, or restructure to avoid the circular dependency.

Suggested change
new_lazy_read_ctx.fill_partition_columns = std::move(_lazy_read_ctx.fill_partition_columns);
new_lazy_read_ctx.fill_missing_columns = std::move(_lazy_read_ctx.fill_missing_columns);
new_lazy_read_ctx.fill_partition_columns = _lazy_read_ctx.fill_partition_columns;
new_lazy_read_ctx.fill_missing_columns = _lazy_read_ctx.fill_missing_columns;

Copilot uses AI. Check for mistakes.
_lazy_read_ctx = std::move(new_lazy_read_ctx);

_top_runtime_vexprs.clear();
_push_down_predicates.clear();

// std::unordered_map<column_name, std::pair<col_id, slot_id>>
std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_columns;
// visit_slot for lazy mat.
Expand Down Expand Up @@ -515,7 +520,7 @@ Status ParquetReader::set_fill_columns(
_lazy_read_ctx.all_predicate_col_ids.emplace_back(_row_id_column_iterator_pair.second);
}

for (auto& kv : partition_columns) {
for (auto& kv : _lazy_read_ctx.fill_partition_columns) {
auto iter = predicate_columns.find(kv.first);
if (iter == predicate_columns.end()) {
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
Expand All @@ -525,7 +530,7 @@ Status ParquetReader::set_fill_columns(
}
}

for (auto& kv : missing_columns) {
for (auto& kv : _lazy_read_ctx.fill_missing_columns) {
auto iter = predicate_columns.find(kv.first);
if (iter == predicate_columns.end()) {
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
Expand Down Expand Up @@ -557,6 +562,17 @@ Status ParquetReader::set_fill_columns(
}
}

return Status::OK();
}

Status ParquetReader::set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
_lazy_read_ctx.fill_partition_columns = partition_columns;
_lazy_read_ctx.fill_missing_columns = missing_columns;
RETURN_IF_ERROR(_update_lazy_read_ctx(_lazy_read_ctx.conjuncts));

if (_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) {
return Status::EndOfFile("No row group to read");
}
Expand Down Expand Up @@ -696,6 +712,13 @@ Status ParquetReader::_next_row_group_reader() {
continue;
}

bool has_late_rf_cond = false;
VExprContextSPtrs new_push_down_conjuncts;
RETURN_IF_ERROR(_call_late_rf_func(&has_late_rf_cond, new_push_down_conjuncts));
if (has_late_rf_cond) {
RETURN_IF_ERROR(_update_lazy_read_ctx(new_push_down_conjuncts));
}

size_t before_predicate_size = _push_down_predicates.size();
_push_down_predicates.reserve(before_predicate_size + _top_runtime_vexprs.size());
for (const auto& vexpr : _top_runtime_vexprs) {
Expand Down
16 changes: 16 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {

bool count_read_rows() override { return true; }

void set_update_late_rf_func(std::function<Status(bool*, VExprContextSPtrs&)>&& func) {
_call_late_rf_func = std::move(func);
}

protected:
void _collect_profile_before_close() override;

Expand Down Expand Up @@ -256,6 +260,9 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
bool _exists_in_file(const VSlotRef* slot) const override;
bool _type_matches(const VSlotRef*) const override;

// update lazy read context when runtime filter changed
Status _update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts);

RuntimeProfile* _profile = nullptr;
const TFileScanRangeParams& _scan_params;
const TFileRangeDesc& _scan_range;
Expand Down Expand Up @@ -344,6 +351,15 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
VExprSPtrs _top_runtime_vexprs;
std::vector<std::unique_ptr<MutilColumnBlockPredicate>> _push_down_predicates;
Arena _arena;

// when creating a new row group reader, call this function to get the latest runtime filter conjuncts.
// The default implementation does nothing, sets 'changed' to false, and returns OK.
// This is used when iceberg read position delete file ...
static Status default_late_rf_func(bool* changed, VExprContextSPtrs&) {
*changed = false;
return Status::OK();
}
std::function<Status(bool*, VExprContextSPtrs&)> _call_late_rf_func = default_late_rf_func;
};
#include "common/compile_check_end.h"

Expand Down
24 changes: 19 additions & 5 deletions be/src/vec/exec/scan/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,19 @@ Status FileScanner::_process_conjuncts() {
return Status::OK();
}

Status FileScanner::_process_late_arrival_conjuncts() {
Status FileScanner::_process_late_arrival_conjuncts(bool* changed,
VExprContextSPtrs& new_push_down_conjuncts) {
*changed = false;
if (_push_down_conjuncts.size() < _conjuncts.size()) {
*changed = true;
_push_down_conjuncts.clear();
_push_down_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
}
RETURN_IF_ERROR(_process_conjuncts());
_discard_conjuncts();
new_push_down_conjuncts = _push_down_conjuncts;
}
if (_applied_rf_num == _total_rf_num) {
_local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
Expand Down Expand Up @@ -1047,9 +1051,17 @@ Status FileScanner::_get_next_reader() {
// ATTN: the push down agg type may be set back to NONE,
// see IcebergTableReader::init_row_filters for example.
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}

std::function<Status(bool*, VExprContextSPtrs&)> update_late_rf =
[&](bool* changed, VExprContextSPtrs& new_push_down_conjuncts) -> Status {
if (!_is_load) {
RETURN_IF_ERROR(try_append_late_arrival_runtime_filter());
RETURN_IF_ERROR(
_process_late_arrival_conjuncts(changed, new_push_down_conjuncts));
}
return Status::OK();
};
parquet_reader->set_update_late_rf_func(std::move(update_late_rf));
RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));

need_to_get_parsed_schema = true;
Expand All @@ -1070,7 +1082,9 @@ Status FileScanner::_get_next_reader() {

orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
bool changed = false;
VExprContextSPtrs new_push_down_conjuncts;
RETURN_IF_ERROR(_process_late_arrival_conjuncts(&changed, new_push_down_conjuncts));
}
RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/scan/file_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ class FileScanner : public Scanner {
void _init_runtime_filter_partition_prune_block();
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
Status _process_conjuncts();
Status _process_late_arrival_conjuncts();
Status _process_late_arrival_conjuncts(bool* changed,
VExprContextSPtrs& new_push_down_conjuncts);
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
Status _generate_truncate_columns(bool need_to_get_parsed_schema);
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Scanner::Scanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
_output_tuple_desc(_local_state->output_tuple_desc()),
_output_row_descriptor(_local_state->_parent->output_row_descriptor()),
_has_prepared(false) {
_total_rf_num = cast_set<int>(_local_state->_helper.runtime_filter_nums());
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name cast_set is ambiguous and doesn't clearly convey its purpose. Consider renaming to something more descriptive like set_int_value or using direct casting syntax like static_cast<int>.

Suggested change
_total_rf_num = cast_set<int>(_local_state->_helper.runtime_filter_nums());
_total_rf_num = static_cast<int>(_local_state->_helper.runtime_filter_nums());

Copilot uses AI. Check for mistakes.
DorisMetrics::instance()->scanner_cnt->increment(1);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use `default`;

create table fact_big (
k INT,
c1 INT,
c2 BIGINT,
c3 DOUBLE,
c4 STRING
)stored as parquet
LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_fact_big';

create table dim_small (
k INT,
c1 INT,
c2 BIGINT
)stored as parquet
LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_dim_small';

msck repair table fact_big;
msck repair table dim_small;
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import groovy.json.JsonSlurper

suite("test_parquet_join_runtime_filter", "p0,external,hive,external_docker,external_docker_hive") {

def getProfileList = {
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}

def getProfile = { id ->
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}


def extractFilteredGroupsValue = { String profileText ->
def values = (profileText =~ /RowGroupsFiltered:\s*(\d+)/).collect { it[1].toLong() }
return values.sort { a, b -> b <=> a }
}

def getProfileWithToken = { token ->
String profileId = ""
int attempts = 0
while (attempts < 10 && (profileId == null || profileId == "")) {
List profileData = new JsonSlurper().parseText(getProfileList()).data.rows
for (def profileItem in profileData) {
if (profileItem["Sql Statement"].toString().contains(token)) {
profileId = profileItem["Profile ID"].toString()
break
}
}
if (profileId == null || profileId == "") {
Thread.sleep(300)
}
attempts++
}
assertTrue(profileId != null && profileId != "")
Thread.sleep(800)
return getProfile(profileId).toString()
}
// session vars
sql "unset variable all;"
sql "set profile_level=2;"
sql "set enable_profile=true;"


String enabled = context.config.otherConfigs.get("enableHiveTest")
if (!"true".equalsIgnoreCase(enabled)) {
return;
}
for (String hivePrefix : ["hive2"]) {
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String hmsPort = context.config.otherConfigs.get(hivePrefix + "HmsPort")
String catalog_name = "test_parquet_join_runtime_filter"

sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'type'='hms',
'hadoop.username' = 'hadoop',
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}'
);
"""
logger.info("catalog " + catalog_name + " created")
sql """switch ${catalog_name};"""
logger.info("switched to catalog " + catalog_name)

sql """ use `default` """


for (int wait_time : [0, 10, 100]) {
sql """ set runtime_filter_wait_time_ms = ${wait_time}; """

def f1 = {
def t1 = UUID.randomUUID().toString()
def sql_result = sql """
select *, "${t1}" from fact_big as a join dim_small as b on a.k = b.k where b.c1 = 5
"""
def filter_result = extractFilteredGroupsValue(getProfileWithToken(t1));
logger.info("sql_result = ${sql_result}");
logger.info("filter_result = ${filter_result}");

assertTrue(filter_result.size() == 2)
assertTrue(filter_result[0] > 40)
}



def f2 = {
def t1 = UUID.randomUUID().toString()
def sql_result = sql """
select *, "${t1}" from fact_big as a join dim_small as b on a.k = b.k where b.c1 in (1,2)
"""
def filter_result = extractFilteredGroupsValue(getProfileWithToken(t1));
logger.info("sql_result = ${sql_result}");
logger.info("filter_result = ${filter_result}");

assertTrue(filter_result.size() == 2)
assertTrue(filter_result[0] > 30)
}




def f3 = {
def t1 = UUID.randomUUID().toString()
def sql_result = sql """
select *, "${t1}" from fact_big as a join dim_small as b on a.k = b.k where b.c1 < 3
"""
def filter_result = extractFilteredGroupsValue(getProfileWithToken(t1));
logger.info("sql_result = ${sql_result}");
logger.info("filter_result = ${filter_result}");

assertTrue(filter_result.size() == 2)
assertTrue(filter_result[0] > 30)
}



def f4 = {
def t1 = UUID.randomUUID().toString()
def sql_result = sql """
select *, "${t1}" from fact_big as a join dim_small as b on a.k = b.k where b.c2 >= 50
"""
def filter_result = extractFilteredGroupsValue(getProfileWithToken(t1));
logger.info("sql_result = ${sql_result}");
logger.info("filter_result = ${filter_result}");

assertTrue(filter_result.size() == 2)
assertTrue(filter_result[0] > 40)
}


f1()
f2()
f3()
f4()
}

sql """drop catalog ${catalog_name};"""
}





}
Loading