diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.h b/be/src/runtime_filter/runtime_filter_consumer_helper.h index 212df4338cbdd8..36da3cd10c0167 100644 --- a/be/src/runtime_filter/runtime_filter_consumer_helper.h +++ b/be/src/runtime_filter/runtime_filter_consumer_helper.h @@ -52,6 +52,8 @@ class RuntimeFilterConsumerHelper { // parent_operator_profile is owned by LocalState so update it is safe at here. void collect_realtime_profile(RuntimeProfile* parent_operator_profile); + size_t runtime_filter_nums() const { return _runtime_filter_descs.size(); } + private: // Append late-arrival runtime filters to the vconjunct_ctx. Status _append_rf_into_conjuncts(RuntimeState* state, diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 095e5dbaa056e0..2129837f35d492 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -79,7 +79,14 @@ class RowGroupReader : public ProfileCollector { // table name struct LazyReadContext { + // all conjuncts: in sql, join runtime filter, topn runtime filter. VExprContextSPtrs conjuncts; + + // ParquetReader::set_fill_columns(xxx, xxx) will set these two members + std::unordered_map> + fill_partition_columns; + std::unordered_map fill_missing_columns; + bool can_lazy_read = false; // block->rows() returns the number of rows of the first column, // so we should check and resize the first column diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 449ce9fd0e775f..75f7365dade572 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -405,11 +405,16 @@ bool ParquetReader::_type_matches(const VSlotRef* slot_ref) const { !is_complex_type(table_col_type->get_primitive_type()); } -Status ParquetReader::set_fill_columns( - const std::unordered_map>& - partition_columns, - const std::unordered_map& missing_columns) { - SCOPED_RAW_TIMER(&_reader_statistics.parse_meta_time); +Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts) { + RowGroupReader::LazyReadContext new_lazy_read_ctx; + new_lazy_read_ctx.conjuncts = new_conjuncts; + new_lazy_read_ctx.fill_partition_columns = std::move(_lazy_read_ctx.fill_partition_columns); + new_lazy_read_ctx.fill_missing_columns = std::move(_lazy_read_ctx.fill_missing_columns); + _lazy_read_ctx = std::move(new_lazy_read_ctx); + + _top_runtime_vexprs.clear(); + _push_down_predicates.clear(); + // std::unordered_map> std::unordered_map> predicate_columns; // visit_slot for lazy mat. @@ -515,7 +520,7 @@ Status ParquetReader::set_fill_columns( _lazy_read_ctx.all_predicate_col_ids.emplace_back(_row_id_column_iterator_pair.second); } - for (auto& kv : partition_columns) { + for (auto& kv : _lazy_read_ctx.fill_partition_columns) { auto iter = predicate_columns.find(kv.first); if (iter == predicate_columns.end()) { _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second); @@ -525,7 +530,7 @@ Status ParquetReader::set_fill_columns( } } - for (auto& kv : missing_columns) { + for (auto& kv : _lazy_read_ctx.fill_missing_columns) { auto iter = predicate_columns.find(kv.first); if (iter == predicate_columns.end()) { _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second); @@ -557,6 +562,17 @@ Status ParquetReader::set_fill_columns( } } + return Status::OK(); +} + +Status ParquetReader::set_fill_columns( + const std::unordered_map>& + partition_columns, + const std::unordered_map& missing_columns) { + _lazy_read_ctx.fill_partition_columns = partition_columns; + _lazy_read_ctx.fill_missing_columns = missing_columns; + RETURN_IF_ERROR(_update_lazy_read_ctx(_lazy_read_ctx.conjuncts)); + if (_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) { return Status::EndOfFile("No row group to read"); } @@ -696,6 +712,13 @@ Status ParquetReader::_next_row_group_reader() { continue; } + bool has_late_rf_cond = false; + VExprContextSPtrs new_push_down_conjuncts; + RETURN_IF_ERROR(_call_late_rf_func(&has_late_rf_cond, new_push_down_conjuncts)); + if (has_late_rf_cond) { + RETURN_IF_ERROR(_update_lazy_read_ctx(new_push_down_conjuncts)); + } + size_t before_predicate_size = _push_down_predicates.size(); _push_down_predicates.reserve(before_predicate_size + _top_runtime_vexprs.size()); for (const auto& vexpr : _top_runtime_vexprs) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index d639feff5ff7e9..e5c94f6f3af6bd 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -164,6 +164,10 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { bool count_read_rows() override { return true; } + void set_update_late_rf_func(std::function&& func) { + _call_late_rf_func = std::move(func); + } + protected: void _collect_profile_before_close() override; @@ -256,6 +260,9 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { bool _exists_in_file(const VSlotRef* slot) const override; bool _type_matches(const VSlotRef*) const override; + // update lazy read context when runtime filter changed + Status _update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts); + RuntimeProfile* _profile = nullptr; const TFileScanRangeParams& _scan_params; const TFileRangeDesc& _scan_range; @@ -344,6 +351,15 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { VExprSPtrs _top_runtime_vexprs; std::vector> _push_down_predicates; Arena _arena; + + // when creating a new row group reader, call this function to get the latest runtime filter conjuncts. + // The default implementation does nothing, sets 'changed' to false, and returns OK. + // This is used when iceberg read position delete file ... + static Status default_late_rf_func(bool* changed, VExprContextSPtrs&) { + *changed = false; + return Status::OK(); + } + std::function _call_late_rf_func = default_late_rf_func; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index ae6414bc84cf02..0ee4c61503d271 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -355,8 +355,11 @@ Status FileScanner::_process_conjuncts() { return Status::OK(); } -Status FileScanner::_process_late_arrival_conjuncts() { +Status FileScanner::_process_late_arrival_conjuncts(bool* changed, + VExprContextSPtrs& new_push_down_conjuncts) { + *changed = false; if (_push_down_conjuncts.size() < _conjuncts.size()) { + *changed = true; _push_down_conjuncts.clear(); _push_down_conjuncts.resize(_conjuncts.size()); for (size_t i = 0; i != _conjuncts.size(); ++i) { @@ -364,6 +367,7 @@ Status FileScanner::_process_late_arrival_conjuncts() { } RETURN_IF_ERROR(_process_conjuncts()); _discard_conjuncts(); + new_push_down_conjuncts = _push_down_conjuncts; } if (_applied_rf_num == _total_rf_num) { _local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True"); @@ -1047,9 +1051,17 @@ Status FileScanner::_get_next_reader() { // ATTN: the push down agg type may be set back to NONE, // see IcebergTableReader::init_row_filters for example. parquet_reader->set_push_down_agg_type(_get_push_down_agg_type()); - if (push_down_predicates) { - RETURN_IF_ERROR(_process_late_arrival_conjuncts()); - } + + std::function update_late_rf = + [&](bool* changed, VExprContextSPtrs& new_push_down_conjuncts) -> Status { + if (!_is_load) { + RETURN_IF_ERROR(try_append_late_arrival_runtime_filter()); + RETURN_IF_ERROR( + _process_late_arrival_conjuncts(changed, new_push_down_conjuncts)); + } + return Status::OK(); + }; + parquet_reader->set_update_late_rf_func(std::move(update_late_rf)); RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr)); need_to_get_parsed_schema = true; @@ -1070,7 +1082,9 @@ Status FileScanner::_get_next_reader() { orc_reader->set_push_down_agg_type(_get_push_down_agg_type()); if (push_down_predicates) { - RETURN_IF_ERROR(_process_late_arrival_conjuncts()); + bool changed = false; + VExprContextSPtrs new_push_down_conjuncts; + RETURN_IF_ERROR(_process_late_arrival_conjuncts(&changed, new_push_down_conjuncts)); } RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr)); diff --git a/be/src/vec/exec/scan/file_scanner.h b/be/src/vec/exec/scan/file_scanner.h index f8b68200806556..bc8149fd3b2f92 100644 --- a/be/src/vec/exec/scan/file_scanner.h +++ b/be/src/vec/exec/scan/file_scanner.h @@ -253,7 +253,8 @@ class FileScanner : public Scanner { void _init_runtime_filter_partition_prune_block(); Status _process_runtime_filters_partition_prune(bool& is_partition_pruned); Status _process_conjuncts(); - Status _process_late_arrival_conjuncts(); + Status _process_late_arrival_conjuncts(bool* changed, + VExprContextSPtrs& new_push_down_conjuncts); void _get_slot_ids(VExpr* expr, std::vector* slot_ids); Status _generate_truncate_columns(bool need_to_get_parsed_schema); Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema); diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp index d321075435b2eb..3bd1b486938179 100644 --- a/be/src/vec/exec/scan/scanner.cpp +++ b/be/src/vec/exec/scan/scanner.cpp @@ -41,6 +41,7 @@ Scanner::Scanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state, _output_tuple_desc(_local_state->output_tuple_desc()), _output_row_descriptor(_local_state->_parent->output_row_descriptor()), _has_prepared(false) { + _total_rf_num = cast_set(_local_state->_helper.runtime_filter_nums()); DorisMetrics::instance()->scanner_cnt->increment(1); } diff --git a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run84.hql b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run84.hql new file mode 100644 index 00000000000000..4b4e7b6e549b29 --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run84.hql @@ -0,0 +1,20 @@ +use `default`; + +create table fact_big ( + k INT, + c1 INT, + c2 BIGINT, + c3 DOUBLE, + c4 STRING +)stored as parquet +LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_fact_big'; + +create table dim_small ( + k INT, + c1 INT, + c2 BIGINT +)stored as parquet +LOCATION '/user/doris/preinstalled_data/parquet_table/runtime_filter_dim_small'; + +msck repair table fact_big; +msck repair table dim_small; diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_dim_small/dim_small.parquet b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_dim_small/dim_small.parquet new file mode 100644 index 00000000000000..e998f3c817a374 Binary files /dev/null and b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_dim_small/dim_small.parquet differ diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_fact_big/fact_big.parquet b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_fact_big/fact_big.parquet new file mode 100644 index 00000000000000..b3ad736022e91b Binary files /dev/null and b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/runtime_filter_fact_big/fact_big.parquet differ diff --git a/regression-test/suites/external_table_p0/hive/test_parquet_join_runtime_filter.groovy b/regression-test/suites/external_table_p0/hive/test_parquet_join_runtime_filter.groovy new file mode 100644 index 00000000000000..c5990fd2fa2eac --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_parquet_join_runtime_filter.groovy @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.json.JsonSlurper + +suite("test_parquet_join_runtime_filter", "p0,external,hive,external_docker,external_docker_hive") { + + def getProfileList = { + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/rest/v1/query_profile").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() + } + + def getProfile = { id -> + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() + } + + + def extractFilteredGroupsValue = { String profileText -> + def values = (profileText =~ /RowGroupsFiltered:\s*(\d+)/).collect { it[1].toLong() } + return values.sort { a, b -> b <=> a } + } + + def getProfileWithToken = { token -> + String profileId = "" + int attempts = 0 + while (attempts < 10 && (profileId == null || profileId == "")) { + List profileData = new JsonSlurper().parseText(getProfileList()).data.rows + for (def profileItem in profileData) { + if (profileItem["Sql Statement"].toString().contains(token)) { + profileId = profileItem["Profile ID"].toString() + break + } + } + if (profileId == null || profileId == "") { + Thread.sleep(300) + } + attempts++ + } + assertTrue(profileId != null && profileId != "") + Thread.sleep(800) + return getProfile(profileId).toString() + } + // session vars + sql "unset variable all;" + sql "set profile_level=2;" + sql "set enable_profile=true;" + + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (!"true".equalsIgnoreCase(enabled)) { + return; + } + for (String hivePrefix : ["hive2"]) { + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String hmsPort = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String catalog_name = "test_parquet_join_runtime_filter" + + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hadoop.username' = 'hadoop', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}' + ); + """ + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + + sql """ use `default` """ + + + for (int wait_time : [0, 10, 100]) { + sql """ set runtime_filter_wait_time_ms = ${wait_time}; """ + + def f1 = { + def t1 = UUID.randomUUID().toString() + def sql_result = sql """ + select *, "${t1}" from fact_big as a join dim_small as b on a.k = b.k where b.c1 = 5 + """ + def filter_result = extractFilteredGroupsValue(getProfileWithToken(t1)); + logger.info("sql_result = ${sql_result}"); + logger.info("filter_result = ${filter_result}"); + + assertTrue(filter_result.size() == 2) + assertTrue(filter_result[0] > 40) + } + + + + def f2 = { + def t1 = UUID.randomUUID().toString() + def sql_result = sql """ + select *, "${t1}" from fact_big as a join dim_small as b on a.k = b.k where b.c1 in (1,2) + """ + def filter_result = extractFilteredGroupsValue(getProfileWithToken(t1)); + logger.info("sql_result = ${sql_result}"); + logger.info("filter_result = ${filter_result}"); + + assertTrue(filter_result.size() == 2) + assertTrue(filter_result[0] > 30) + } + + + + + def f3 = { + def t1 = UUID.randomUUID().toString() + def sql_result = sql """ + select *, "${t1}" from fact_big as a join dim_small as b on a.k = b.k where b.c1 < 3 + """ + def filter_result = extractFilteredGroupsValue(getProfileWithToken(t1)); + logger.info("sql_result = ${sql_result}"); + logger.info("filter_result = ${filter_result}"); + + assertTrue(filter_result.size() == 2) + assertTrue(filter_result[0] > 30) + } + + + + def f4 = { + def t1 = UUID.randomUUID().toString() + def sql_result = sql """ + select *, "${t1}" from fact_big as a join dim_small as b on a.k = b.k where b.c2 >= 50 + """ + def filter_result = extractFilteredGroupsValue(getProfileWithToken(t1)); + logger.info("sql_result = ${sql_result}"); + logger.info("filter_result = ${filter_result}"); + + assertTrue(filter_result.size() == 2) + assertTrue(filter_result[0] > 40) + } + + + f1() + f2() + f3() + f4() + } + + sql """drop catalog ${catalog_name};""" + } + + + + + +}