diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h index a0c672d707abf5..7fc00daf7b61b7 100644 --- a/be/src/exec/olap_common.h +++ b/be/src/exec/olap_common.h @@ -212,16 +212,19 @@ class ColumnValueRange { int scale() const { return _scale; } - static void add_fixed_value_range(ColumnValueRange& range, + static void add_fixed_value_range(ColumnValueRange& range, SQLFilterOp op, const CppType* value) { static_cast(range.add_fixed_value(*value)); } - static void remove_fixed_value_range(ColumnValueRange& range, + static void remove_fixed_value_range(ColumnValueRange& range, SQLFilterOp op, const CppType* value) { range.remove_fixed_value(*value); } + static void empty_function(ColumnValueRange& range, SQLFilterOp op, + const CppType* value) {} + static void add_value_range(ColumnValueRange& range, SQLFilterOp op, const CppType* value) { static_cast(range.add_range(op, *value)); diff --git a/be/src/exec/olap_utils.h b/be/src/exec/olap_utils.h index 444df52a009f4d..d192ed1d49693c 100644 --- a/be/src/exec/olap_utils.h +++ b/be/src/exec/olap_utils.h @@ -67,43 +67,14 @@ enum SQLFilterOp { FILTER_LESS = 2, FILTER_LESS_OR_EQUAL = 3, FILTER_IN = 4, - FILTER_NOT_IN = 5 + FILTER_NOT_IN = 5, + FILTER_EQ = 6, + FILTER_NE = 7 }; template constexpr bool always_false_v = false; -inline SQLFilterOp to_olap_filter_type(TExprOpcode::type type, bool opposite) { - switch (type) { - case TExprOpcode::LT: - return opposite ? FILTER_LARGER : FILTER_LESS; - - case TExprOpcode::LE: - return opposite ? FILTER_LARGER_OR_EQUAL : FILTER_LESS_OR_EQUAL; - - case TExprOpcode::GT: - return opposite ? FILTER_LESS : FILTER_LARGER; - - case TExprOpcode::GE: - return opposite ? FILTER_LESS_OR_EQUAL : FILTER_LARGER_OR_EQUAL; - - case TExprOpcode::EQ: - return opposite ? FILTER_NOT_IN : FILTER_IN; - - case TExprOpcode::NE: - return opposite ? FILTER_IN : FILTER_NOT_IN; - - case TExprOpcode::EQ_FOR_NULL: - return FILTER_IN; - - default: - VLOG_CRITICAL << "TExprOpcode: " << type; - DCHECK(false); - } - - return FILTER_IN; -} - inline SQLFilterOp to_olap_filter_type(const std::string& function_name) { if (function_name == "lt") { return FILTER_LESS; @@ -114,9 +85,9 @@ inline SQLFilterOp to_olap_filter_type(const std::string& function_name) { } else if (function_name == "ge") { return FILTER_LARGER_OR_EQUAL; } else if (function_name == "eq") { - return FILTER_IN; + return FILTER_EQ; } else if (function_name == "ne") { - return FILTER_NOT_IN; + return FILTER_NE; } else if (function_name == "in") { return FILTER_IN; } else if (function_name == "not_in") { diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 6d5e0d7c24488b..ddb29ccdf51bd0 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -55,50 +55,6 @@ PushDownType FileScanLocalState::_should_push_down_binary_predicate( } } -bool FileScanLocalState::_should_push_down_or_predicate_recursively( - const vectorized::VExprSPtr& expr) const { - if (expr->node_type() == TExprNodeType::COMPOUND_PRED && - expr->op() == TExprOpcode::COMPOUND_OR) { - return std::ranges::all_of(expr->children(), [this](const vectorized::VExprSPtr& it) { - return _should_push_down_or_predicate_recursively(it); - }); - } else if (expr->node_type() == TExprNodeType::COMPOUND_PRED && - expr->op() == TExprOpcode::COMPOUND_AND) { - return std::ranges::any_of(expr->children(), [this](const vectorized::VExprSPtr& it) { - return _should_push_down_or_predicate_recursively(it); - }); - } else { - auto children = expr->children(); - if (children.empty() || children[0]->node_type() != TExprNodeType::SLOT_REF) { - // not a slot ref(column) - return false; - } - std::shared_ptr slot_ref = - std::dynamic_pointer_cast(children[0]); - auto entry = _slot_id_to_predicates.find(slot_ref->slot_id()); - if (_slot_id_to_predicates.end() == entry) { - return false; - } - if (is_complex_type(slot_ref->data_type()->get_primitive_type())) { - return false; - } - return true; - } -} - -PushDownType FileScanLocalState::_should_push_down_or_predicate( - const vectorized::VExprContext* expr_ctx) const { - // TODO(gabriel): Do not push down OR predicate for the time being. - // auto expr = expr_ctx->root()->get_impl() ? expr_ctx->root()->get_impl() : expr_ctx->root(); - // if (expr->node_type() == TExprNodeType::COMPOUND_PRED && - // expr->op() == TExprOpcode::COMPOUND_OR) { - // if (_should_push_down_or_predicate_recursively(expr)) { - // return PushDownType::PARTIAL_ACCEPTABLE; - // } - // } - return PushDownType::UNACCEPTABLE; -} - int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) const { // For select * from table limit 10; should just use one thread. if (should_run_serial()) { diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index c682f30f409266..c2e1da398fee8f 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -82,9 +82,6 @@ class FileScanLocalState final : public ScanLocalState { PushDownType _should_push_down_binary_predicate( vectorized::VectorizedFnCall* fn_call, vectorized::VExprContext* expr_ctx, StringRef* constant_val, const std::set fn_name) const override; - PushDownType _should_push_down_or_predicate( - const vectorized::VExprContext* expr_ctx) const override; - bool _should_push_down_or_predicate_recursively(const vectorized::VExprSPtr& expr) const; std::shared_ptr _split_source = nullptr; int _max_scanners; // A in memory cache to save some common components diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 923f0b5f42b689..2df58077d42385 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -450,19 +450,6 @@ Status OlapScanLocalState::_init_scanners(std::list* sc return Status::OK(); } SCOPED_TIMER(_scanner_init_timer); - - if (!_conjuncts.empty() && _state->enable_profile()) { - std::string message; - for (auto& conjunct : _conjuncts) { - if (conjunct->root()) { - if (!message.empty()) { - message += ", "; - } - message += conjunct->root()->debug_string(); - } - } - custom_profile()->add_info_string("RemainedDownPredicates", message); - } auto& p = _parent->cast(); for (auto uid : p._olap_scan_node.output_column_unique_ids) { @@ -834,23 +821,6 @@ void OlapScanLocalState::set_scan_ranges(RuntimeState* state, } } -static std::string predicates_to_string( - const phmap::flat_hash_map>>& - slot_id_to_predicates) { - fmt::memory_buffer debug_string_buffer; - for (const auto& [slot_id, predicates] : slot_id_to_predicates) { - if (predicates.empty()) { - continue; - } - fmt::format_to(debug_string_buffer, "Slot ID: {}: [", slot_id); - for (const auto& predicate : predicates) { - fmt::format_to(debug_string_buffer, "{{{}}}, ", predicate->debug_string()); - } - fmt::format_to(debug_string_buffer, "] "); - } - return fmt::to_string(debug_string_buffer); -} - static std::string tablets_id_to_string( const std::vector>& scan_ranges) { if (scan_ranges.empty()) { @@ -960,8 +930,6 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() { } if (state()->enable_profile()) { - custom_profile()->add_info_string("PushDownPredicates", - predicates_to_string(_slot_id_to_predicates)); custom_profile()->add_info_string("KeyRanges", _scan_keys.debug_string()); custom_profile()->add_info_string("TabletIds", tablets_id_to_string(_scan_ranges)); } diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index e81a4ae620e285..9da8d31f00df4e 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -181,6 +181,23 @@ Status ScanLocalState::open(RuntimeState* state) { return status; } +static std::string predicates_to_string( + const phmap::flat_hash_map>>& + slot_id_to_predicates) { + fmt::memory_buffer debug_string_buffer; + for (const auto& [slot_id, predicates] : slot_id_to_predicates) { + if (predicates.empty()) { + continue; + } + fmt::format_to(debug_string_buffer, "Slot ID: {}: [", slot_id); + for (const auto& predicate : predicates) { + fmt::format_to(debug_string_buffer, "{{{}}}, ", predicate->debug_string()); + } + fmt::format_to(debug_string_buffer, "] "); + } + return fmt::to_string(debug_string_buffer); +} + template Status ScanLocalState::_normalize_conjuncts(RuntimeState* state) { auto& p = _parent->cast(); @@ -268,6 +285,21 @@ Status ScanLocalState::_normalize_conjuncts(RuntimeState* state) { ++it; } + if (state->enable_profile()) { + custom_profile()->add_info_string("PushDownPredicates", + predicates_to_string(_slot_id_to_predicates)); + std::string message; + for (auto& conjunct : _conjuncts) { + if (conjunct->root()) { + if (!message.empty()) { + message += ", "; + } + message += conjunct->root()->debug_string(); + } + } + custom_profile()->add_info_string("RemainedDownPredicates", message); + } + for (auto& it : _slot_id_to_value_range) { std::visit( [&](auto&& range) { @@ -314,39 +346,52 @@ Status ScanLocalState::_normalize_predicate(vectorized::VExprContext* c Status status = Status::OK(); std::visit( [&](auto& value_range) { - auto r = root; - RETURN_IF_PUSH_DOWN( - _normalize_in_and_eq_predicate(context, r, slot, - _slot_id_to_predicates[slot->id()], - value_range, &pdt), - status); - RETURN_IF_PUSH_DOWN( - _normalize_not_in_and_not_eq_predicate( - context, r, slot, _slot_id_to_predicates[slot->id()], - value_range, &pdt), - status); - RETURN_IF_PUSH_DOWN( - _normalize_is_null_predicate(context, r, slot, - _slot_id_to_predicates[slot->id()], - value_range, &pdt), - status); - RETURN_IF_PUSH_DOWN( - _normalize_noneq_binary_predicate(context, r, slot, - _slot_id_to_predicates[slot->id()], - value_range, &pdt), - status); - RETURN_IF_PUSH_DOWN( - _normalize_bitmap_filter(context, r, slot, - _slot_id_to_predicates[slot->id()], &pdt), - status); - RETURN_IF_PUSH_DOWN( - _normalize_bloom_filter(context, r, slot, - _slot_id_to_predicates[slot->id()], &pdt), - status); - RETURN_IF_PUSH_DOWN( - _normalize_topn_filter(context, r, slot, - _slot_id_to_predicates[slot->id()], &pdt), - status); + auto expr = root->is_rf_wrapper() ? root->get_impl() : root; + switch (expr->node_type()) { + case TExprNodeType::IN_PRED: + RETURN_IF_PUSH_DOWN( + _normalize_in_predicate(context, expr, slot, + _slot_id_to_predicates[slot->id()], + value_range, &pdt), + status); + break; + case TExprNodeType::BINARY_PRED: + RETURN_IF_PUSH_DOWN( + _normalize_binary_predicate(context, expr, slot, + _slot_id_to_predicates[slot->id()], + value_range, &pdt), + status); + break; + case TExprNodeType::FUNCTION_CALL: + if (expr->is_topn_filter()) { + RETURN_IF_PUSH_DOWN(_normalize_topn_filter( + context, expr, slot, + _slot_id_to_predicates[slot->id()], &pdt), + status); + } else { + RETURN_IF_PUSH_DOWN( + _normalize_is_null_predicate(context, expr, slot, + _slot_id_to_predicates[slot->id()], + value_range, &pdt), + status); + } + break; + case TExprNodeType::BITMAP_PRED: + RETURN_IF_PUSH_DOWN( + _normalize_bitmap_filter(context, root, slot, + _slot_id_to_predicates[slot->id()], &pdt), + status); + break; + case TExprNodeType::BLOOM_PRED: + RETURN_IF_PUSH_DOWN( + _normalize_bloom_filter(context, root, slot, + _slot_id_to_predicates[slot->id()], &pdt), + status); + break; + default: + break; + } + // `node_type` of function filter is FUNCTION_CALL or COMPOUND_PRED if (state()->enable_function_pushdown()) { RETURN_IF_PUSH_DOWN(_normalize_function_filters(context, slot, &pdt), status); @@ -377,7 +422,7 @@ Status ScanLocalState::_normalize_predicate(vectorized::VExprContext* c template Status ScanLocalState::_normalize_bloom_filter( - vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, SlotDescriptor* slot, + vectorized::VExprContext* expr_ctx, const vectorized::VExprSPtr& root, SlotDescriptor* slot, std::vector>& predicates, PushDownType* pdt) { std::shared_ptr pred = nullptr; Defer defer = [&]() { @@ -386,31 +431,29 @@ Status ScanLocalState::_normalize_bloom_filter( predicates.emplace_back(pred); } }; + DCHECK(TExprNodeType::BLOOM_PRED == root->node_type()); auto expr = root->is_rf_wrapper() ? root->get_impl() : root; - if (TExprNodeType::BLOOM_PRED == expr->node_type()) { - DCHECK(expr->get_num_children() == 1); - DCHECK(root->is_rf_wrapper()); - *pdt = _should_push_down_bloom_filter(); - if (*pdt != PushDownType::UNACCEPTABLE) { - auto* rf_expr = assert_cast(root.get()); - pred = create_bloom_filter_predicate( - _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), - slot->type()->get_primitive_type() == TYPE_VARIANT - ? expr->get_child(0)->data_type() - : slot->type(), - expr->get_bloom_filter_func()); - pred->attach_profile_counter(rf_expr->filter_id(), - rf_expr->predicate_filtered_rows_counter(), - rf_expr->predicate_input_rows_counter(), - rf_expr->predicate_always_true_rows_counter()); - } + DCHECK(expr->get_num_children() == 1); + DCHECK(root->is_rf_wrapper()); + *pdt = _should_push_down_bloom_filter(); + if (*pdt != PushDownType::UNACCEPTABLE) { + auto* rf_expr = assert_cast(root.get()); + pred = create_bloom_filter_predicate( + _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), + slot->type()->get_primitive_type() == TYPE_VARIANT ? expr->get_child(0)->data_type() + : slot->type(), + expr->get_bloom_filter_func()); + pred->attach_profile_counter(rf_expr->filter_id(), + rf_expr->predicate_filtered_rows_counter(), + rf_expr->predicate_input_rows_counter(), + rf_expr->predicate_always_true_rows_counter()); } return Status::OK(); } template Status ScanLocalState::_normalize_topn_filter( - vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, SlotDescriptor* slot, + vectorized::VExprContext* expr_ctx, const vectorized::VExprSPtr& root, SlotDescriptor* slot, std::vector>& predicates, PushDownType* pdt) { std::shared_ptr pred = nullptr; Defer defer = [&]() { @@ -419,16 +462,14 @@ Status ScanLocalState::_normalize_topn_filter( predicates.emplace_back(pred); } }; - auto expr = root->is_rf_wrapper() ? root->get_impl() : root; - if (expr->is_topn_filter()) { - *pdt = _should_push_down_topn_filter(); - if (*pdt != PushDownType::UNACCEPTABLE) { - auto& p = _parent->cast(); - auto& tmp = _state->get_query_ctx()->get_runtime_predicate( - assert_cast(expr.get())->source_node_id()); - if (_push_down_topn(tmp)) { - pred = tmp.get_predicate(p.node_id()); - } + DCHECK(root->is_topn_filter()); + *pdt = _should_push_down_topn_filter(); + if (*pdt != PushDownType::UNACCEPTABLE) { + auto& p = _parent->cast(); + auto& tmp = _state->get_query_ctx()->get_runtime_predicate( + assert_cast(root.get())->source_node_id()); + if (_push_down_topn(tmp)) { + pred = tmp.get_predicate(p.node_id()); } } return Status::OK(); @@ -436,7 +477,7 @@ Status ScanLocalState::_normalize_topn_filter( template Status ScanLocalState::_normalize_bitmap_filter( - vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, SlotDescriptor* slot, + vectorized::VExprContext* expr_ctx, const vectorized::VExprSPtr& root, SlotDescriptor* slot, std::vector>& predicates, PushDownType* pdt) { std::shared_ptr pred = nullptr; Defer defer = [&]() { @@ -445,24 +486,22 @@ Status ScanLocalState::_normalize_bitmap_filter( predicates.emplace_back(pred); } }; + DCHECK(TExprNodeType::BITMAP_PRED == root->node_type()); auto expr = root->is_rf_wrapper() ? root->get_impl() : root; - if (TExprNodeType::BITMAP_PRED == expr->node_type()) { - *pdt = _should_push_down_bitmap_filter(); - if (*pdt != PushDownType::UNACCEPTABLE) { - DCHECK(expr->get_num_children() == 1); - DCHECK(root->is_rf_wrapper()); - auto* rf_expr = assert_cast(root.get()); - pred = create_bitmap_filter_predicate( - _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), - slot->type()->get_primitive_type() == TYPE_VARIANT - ? expr->get_child(0)->data_type() - : slot->type(), - expr->get_bitmap_filter_func()); - pred->attach_profile_counter(rf_expr->filter_id(), - rf_expr->predicate_filtered_rows_counter(), - rf_expr->predicate_input_rows_counter(), - rf_expr->predicate_always_true_rows_counter()); - } + *pdt = _should_push_down_bitmap_filter(); + if (*pdt != PushDownType::UNACCEPTABLE) { + DCHECK(expr->get_num_children() == 1); + DCHECK(root->is_rf_wrapper()); + auto* rf_expr = assert_cast(root.get()); + pred = create_bitmap_filter_predicate( + _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), + slot->type()->get_primitive_type() == TYPE_VARIANT ? expr->get_child(0)->data_type() + : slot->type(), + expr->get_bitmap_filter_func()); + pred->attach_profile_counter(rf_expr->filter_id(), + rf_expr->predicate_filtered_rows_counter(), + rf_expr->predicate_input_rows_counter(), + rf_expr->predicate_always_true_rows_counter()); } return Status::OK(); } @@ -589,8 +628,8 @@ Status ScanLocalState::_eval_const_conjuncts(vectorized::VExprContext* template template -Status ScanLocalState::_normalize_in_and_eq_predicate( - vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, SlotDescriptor* slot, +Status ScanLocalState::_normalize_in_predicate( + vectorized::VExprContext* expr_ctx, const vectorized::VExprSPtr& root, SlotDescriptor* slot, std::vector>& predicates, ColumnValueRange& range, PushDownType* pdt) { std::shared_ptr pred = nullptr; @@ -600,124 +639,95 @@ Status ScanLocalState::_normalize_in_and_eq_predicate( predicates.emplace_back(pred); } }; - auto temp_range = ColumnValueRange::create_empty_column_value_range( - slot->is_nullable(), range.precision(), range.scale()); if (slot->get_virtual_column_expr() != nullptr) { // virtual column, do not push down return Status::OK(); } - auto expr = root->is_rf_wrapper() ? root->get_impl() : root; - // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' - if (TExprNodeType::IN_PRED == expr->node_type()) { - *pdt = _should_push_down_in_predicate(); - if (*pdt == PushDownType::UNACCEPTABLE) { - return Status::OK(); + DCHECK(!root->is_rf_wrapper()) << root->debug_string(); + DCHECK(TExprNodeType::IN_PRED == root->node_type()) << root->debug_string(); + *pdt = _should_push_down_in_predicate(); + if (*pdt == PushDownType::UNACCEPTABLE) { + return Status::OK(); + } + HybridSetBase::IteratorBase* iter = nullptr; + auto hybrid_set = root->get_set_func(); + + auto is_in = false; + if (hybrid_set != nullptr) { + // runtime filter produce VDirectInPredicate + if (hybrid_set->size() <= + _parent->cast()._max_pushdown_conditions_per_column) { + iter = hybrid_set->begin(); } - HybridSetBase::IteratorBase* iter = nullptr; - auto hybrid_set = expr->get_set_func(); - - if (hybrid_set != nullptr) { - // runtime filter produce VDirectInPredicate - if (hybrid_set->size() <= - _parent->cast()._max_pushdown_conditions_per_column) { - iter = hybrid_set->begin(); - } - } else { - // normal in predicate - auto* tmp = assert_cast(expr.get()); - if (tmp->is_not_in()) { - *pdt = PushDownType::UNACCEPTABLE; - return Status::OK(); - } - - // begin to push InPredicate value into ColumnValueRange - auto* state = reinterpret_cast( - expr_ctx->fn_context(tmp->fn_context_index()) - ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); - - // xx in (col, xx, xx) should not be push down - if (!state->use_set) { - return Status::OK(); - } + is_in = true; + } else { + // normal in predicate + auto* tmp = assert_cast(root.get()); - hybrid_set = state->hybrid_set; - iter = state->hybrid_set->begin(); - } + // begin to push InPredicate value into ColumnValueRange + auto* state = reinterpret_cast( + expr_ctx->fn_context(tmp->fn_context_index()) + ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); - if (iter) { - while (iter->has_next()) { - // column in (nullptr) is always false so continue to - // dispose next item - DCHECK(iter->get_value() != nullptr); - const auto* value = iter->get_value(); - RETURN_IF_ERROR(_change_value_range( - temp_range, value, ColumnValueRange::add_fixed_value_range, "")); - iter->next(); - } - range.intersection(temp_range); - } - pred = create_in_list_predicate( - _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), - slot->type()->get_primitive_type() == TYPE_VARIANT ? expr->get_child(0)->data_type() - : slot->type(), - hybrid_set, false); - } else if (TExprNodeType::BINARY_PRED == expr->node_type()) { - DCHECK(expr->get_num_children() == 2); - StringRef value; - *pdt = _should_push_down_binary_predicate( - assert_cast(expr.get()), expr_ctx, &value, {"eq"}); - if (*pdt == PushDownType::UNACCEPTABLE) { + // xx in (col, xx, xx) should not be push down + if (!state->use_set) { return Status::OK(); } - // where A = nullptr should return empty result set - auto fn_name = std::string(""); - if (value.data != nullptr) { - if (!is_string_type(T) && - sizeof(typename PrimitiveTypeTraits::CppType) != value.size) { - return Status::InternalError( - "PrimitiveType {} meet invalid input value size {}, expect size {}", T, - value.size, sizeof(typename PrimitiveTypeTraits::CppType)); - } - pred = create_comparison_predicate0( - _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), - slot->type()->get_primitive_type() == TYPE_VARIANT - ? expr->get_child(0)->data_type() - : slot->type(), - value, false, _arena); + is_in = !tmp->is_not_in(); - if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || - T == TYPE_HLL) { - auto val = StringRef(value.data, value.size); - RETURN_IF_ERROR(_change_value_range( - temp_range, reinterpret_cast(&val), - ColumnValueRange::add_fixed_value_range, fn_name)); - } else { - if (sizeof(typename PrimitiveTypeTraits::CppType) != value.size) { - return Status::InternalError( - "PrimitiveType {} meet invalid input value size {}, expect size {}", T, - value.size, sizeof(typename PrimitiveTypeTraits::CppType)); - } - RETURN_IF_ERROR(_change_value_range( - temp_range, reinterpret_cast(value.data), - ColumnValueRange::add_fixed_value_range, fn_name)); - } - range.intersection(temp_range); - } else { - *pdt = PushDownType::UNACCEPTABLE; + if (state->hybrid_set->contain_null() && tmp->is_not_in()) { _eos = true; _scan_dependency->set_ready(); + return Status::OK(); } + hybrid_set = state->hybrid_set; + iter = state->hybrid_set->begin(); } + if (iter) { + auto empty_range = ColumnValueRange::create_empty_column_value_range( + slot->is_nullable(), range.precision(), range.scale()); + auto& temp_range = is_in ? empty_range : range; + auto fn = is_in ? ColumnValueRange::add_fixed_value_range + : (range.is_fixed_value_range() + ? ColumnValueRange::remove_fixed_value_range + : ColumnValueRange::empty_function); + while (iter->has_next()) { + // column in (nullptr) is always false so continue to + // dispose next item + DCHECK(iter->get_value() != nullptr); + const auto* value = iter->get_value(); + RETURN_IF_ERROR( + _change_value_range(is_in, temp_range, value, fn, is_in ? "in" : "not_in")); + iter->next(); + } + if (is_in) { + range.intersection(temp_range); + } + } + pred = is_in ? create_in_list_predicate( + _parent->intermediate_row_desc().get_column_id(slot->id()), + slot->col_name(), + slot->type()->get_primitive_type() == TYPE_VARIANT + ? root->get_child(0)->data_type() + : slot->type(), + hybrid_set, false) + : create_in_list_predicate( + _parent->intermediate_row_desc().get_column_id(slot->id()), + slot->col_name(), + slot->type()->get_primitive_type() == TYPE_VARIANT + ? root->get_child(0)->data_type() + : slot->type(), + hybrid_set, false); return Status::OK(); } template template -Status ScanLocalState::_normalize_not_in_and_not_eq_predicate( - vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, SlotDescriptor* slot, +Status ScanLocalState::_normalize_binary_predicate( + vectorized::VExprContext* expr_ctx, const vectorized::VExprSPtr& root, SlotDescriptor* slot, std::vector>& predicates, ColumnValueRange& range, PushDownType* pdt) { std::shared_ptr pred = nullptr; @@ -727,123 +737,133 @@ Status ScanLocalState::_normalize_not_in_and_not_eq_predicate( predicates.emplace_back(pred); } }; - bool is_fixed_range = range.is_fixed_value_range(); - auto expr = root->is_rf_wrapper() ? root->get_impl() : root; - // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' - if (TExprNodeType::IN_PRED == expr->node_type()) { - *pdt = _should_push_down_in_predicate(); - if (*pdt == PushDownType::UNACCEPTABLE) { - return Status::OK(); - } - /// `VDirectInPredicate` here should not be pushed down. - /// here means the `VDirectInPredicate` is too big to be converted into `ColumnValueRange`. - /// For non-key columns and `_storage_no_merge()` is false, this predicate should not be pushed down. - if (expr->get_set_func() != nullptr) { - *pdt = PushDownType::UNACCEPTABLE; - return Status::OK(); - } - - auto* tmp = assert_cast(expr.get()); - if (!tmp->is_not_in()) { - *pdt = PushDownType::UNACCEPTABLE; - return Status::OK(); - } - // begin to push InPredicate value into ColumnValueRange - auto* state = reinterpret_cast( - expr_ctx->fn_context(tmp->fn_context_index()) - ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); - - // xx in (col, xx, xx) should not be push down - if (!state->use_set) { - *pdt = PushDownType::UNACCEPTABLE; - return Status::OK(); - } - - HybridSetBase::IteratorBase* iter = state->hybrid_set->begin(); - auto fn_name = std::string(""); - if (state->hybrid_set->contain_null()) { - _eos = true; - _scan_dependency->set_ready(); - } - pred = create_in_list_predicate( - _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), - slot->type()->get_primitive_type() == TYPE_VARIANT ? expr->get_child(0)->data_type() - : slot->type(), - state->hybrid_set, false); - while (iter->has_next()) { - // column not in (nullptr) is always true - DCHECK(iter->get_value() != nullptr); - const auto value = iter->get_value(); - if (is_fixed_range) { - RETURN_IF_ERROR(_change_value_range( - range, value, ColumnValueRange::remove_fixed_value_range, fn_name)); - } - iter->next(); - } - } else if (TExprNodeType::BINARY_PRED == expr->node_type()) { - DCHECK(expr->get_num_children() == 2); + if (slot->get_virtual_column_expr() != nullptr) { + // virtual column, do not push down + return Status::OK(); + } - StringRef value; - *pdt = _should_push_down_binary_predicate( - assert_cast(expr.get()), expr_ctx, &value, {"ne"}); - if (*pdt == PushDownType::UNACCEPTABLE) { - return Status::OK(); + DCHECK(!root->is_rf_wrapper()) << root->debug_string(); + DCHECK(TExprNodeType::BINARY_PRED == root->node_type()) << root->debug_string(); + DCHECK(root->get_num_children() == 2); + StringRef value; + *pdt = _should_push_down_binary_predicate( + assert_cast(root.get()), expr_ctx, &value, + {"eq", "ne", "lt", "gt", "le", "ge"}); + if (*pdt == PushDownType::UNACCEPTABLE) { + return Status::OK(); + } + const std::string& function_name = + assert_cast(root.get())->fn().name.function_name; + auto op = to_olap_filter_type(function_name); + auto is_equal_op = op == SQLFilterOp::FILTER_EQ || op == SQLFilterOp::FILTER_NE; + auto empty_range = ColumnValueRange::create_empty_column_value_range( + slot->is_nullable(), range.precision(), range.scale()); + auto& temp_range = op == SQLFilterOp::FILTER_EQ ? empty_range : range; + if (value.data != nullptr) { + if (!is_string_type(T) && sizeof(typename PrimitiveTypeTraits::CppType) != value.size) { + return Status::InternalError( + "PrimitiveType {} meet invalid input value size {}, expect size {}", T, + value.size, sizeof(typename PrimitiveTypeTraits::CppType)); } - - // where A = nullptr should return empty result set - if (value.data != nullptr) { - if (!is_string_type(T) && - sizeof(typename PrimitiveTypeTraits::CppType) != value.size) { - return Status::InternalError( - "PrimitiveType {} meet invalid input value size {}, expect size {}", T, - value.size, sizeof(typename PrimitiveTypeTraits::CppType)); - } + switch (op) { + case SQLFilterOp::FILTER_EQ: + pred = create_comparison_predicate0( + _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), + slot->type()->get_primitive_type() == TYPE_VARIANT + ? root->get_child(0)->data_type() + : slot->type(), + value, false, _arena); + break; + case SQLFilterOp::FILTER_NE: pred = create_comparison_predicate0( _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), slot->type()->get_primitive_type() == TYPE_VARIANT - ? expr->get_child(0)->data_type() + ? root->get_child(0)->data_type() : slot->type(), value, false, _arena); - auto fn_name = std::string(""); - if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || - T == TYPE_HLL) { - auto val = StringRef(value.data, value.size); - if (is_fixed_range) { - RETURN_IF_ERROR(_change_value_range( - range, reinterpret_cast(&val), - ColumnValueRange::remove_fixed_value_range, fn_name)); - } - } else { - if (is_fixed_range) { - RETURN_IF_ERROR(_change_value_range( - range, reinterpret_cast(value.data), - ColumnValueRange::remove_fixed_value_range, fn_name)); - } - } + break; + case SQLFilterOp::FILTER_LESS: + pred = create_comparison_predicate0( + _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), + slot->type()->get_primitive_type() == TYPE_VARIANT + ? root->get_child(0)->data_type() + : slot->type(), + value, false, _arena); + break; + case SQLFilterOp::FILTER_LARGER: + pred = create_comparison_predicate0( + _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), + slot->type()->get_primitive_type() == TYPE_VARIANT + ? root->get_child(0)->data_type() + : slot->type(), + value, false, _arena); + break; + case SQLFilterOp::FILTER_LESS_OR_EQUAL: + pred = create_comparison_predicate0( + _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), + slot->type()->get_primitive_type() == TYPE_VARIANT + ? root->get_child(0)->data_type() + : slot->type(), + value, false, _arena); + break; + case SQLFilterOp::FILTER_LARGER_OR_EQUAL: + pred = create_comparison_predicate0( + _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), + slot->type()->get_primitive_type() == TYPE_VARIANT + ? root->get_child(0)->data_type() + : slot->type(), + value, false, _arena); + break; + default: + throw Exception(Status::InternalError("Unsupported function name: {}", function_name)); + } + + auto fn = op == SQLFilterOp::FILTER_EQ ? ColumnValueRange::add_fixed_value_range + : op == SQLFilterOp::FILTER_NE + ? (range.is_fixed_value_range() + ? ColumnValueRange::remove_fixed_value_range + : ColumnValueRange::empty_function) + : ColumnValueRange::add_value_range; + if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || T == TYPE_HLL) { + auto val = StringRef(value.data, value.size); + RETURN_IF_ERROR(_change_value_range(is_equal_op, temp_range, + reinterpret_cast(&val), fn, function_name)); } else { - *pdt = PushDownType::UNACCEPTABLE; - _eos = true; - _scan_dependency->set_ready(); + if (sizeof(typename PrimitiveTypeTraits::CppType) != value.size) { + return Status::InternalError( + "PrimitiveType {} meet invalid input value size {}, expect size {}", T, + value.size, sizeof(typename PrimitiveTypeTraits::CppType)); + } + RETURN_IF_ERROR(_change_value_range(is_equal_op, temp_range, + reinterpret_cast(value.data), fn, + function_name)); + } + if (op == SQLFilterOp::FILTER_EQ) { + range.intersection(temp_range); } } else { *pdt = PushDownType::UNACCEPTABLE; + _eos = true; + _scan_dependency->set_ready(); } + return Status::OK(); } template -template -Status ScanLocalState::_change_value_range(ColumnValueRange& temp_range, +template +Status ScanLocalState::_change_value_range(bool is_equal_op, + ColumnValueRange& temp_range, const void* value, const ChangeFixedValueRangeFunc& func, const std::string& fn_name) { if constexpr (PrimitiveType == TYPE_DATE) { VecDateTimeValue tmp_value; memcpy(&tmp_value, value, sizeof(VecDateTimeValue)); - if constexpr (IsFixed) { + if (is_equal_op) { if (!tmp_value.check_loss_accuracy_cast_to_date()) { - func(temp_range, + func(temp_range, to_olap_filter_type(fn_name), reinterpret_cast::CppType*>( &tmp_value)); } @@ -858,22 +878,10 @@ Status ScanLocalState::_change_value_range(ColumnValueRange::CppType*>( - value)); - } else { - func(temp_range, to_olap_filter_type(fn_name), - reinterpret_cast::CppType*>( - reinterpret_cast(value))); - } + func(temp_range, to_olap_filter_type(fn_name), + reinterpret_cast::CppType*>(value)); } else if constexpr (PrimitiveType == TYPE_HLL) { - if constexpr (IsFixed) { - func(temp_range, reinterpret_cast(value)); - } else { - func(temp_range, to_olap_filter_type(fn_name), - reinterpret_cast(value)); - } + func(temp_range, to_olap_filter_type(fn_name), reinterpret_cast(value)); } else if constexpr ((PrimitiveType == TYPE_DECIMALV2) || (PrimitiveType == TYPE_DATETIMEV2) || (PrimitiveType == TYPE_TINYINT) || (PrimitiveType == TYPE_SMALLINT) || (PrimitiveType == TYPE_INT) || (PrimitiveType == TYPE_BIGINT) || @@ -883,33 +891,20 @@ Status ScanLocalState::_change_value_range(ColumnValueRange::CppType*>( - value)); - } else { - func(temp_range, to_olap_filter_type(fn_name), - reinterpret_cast::CppType*>( - value)); - } + func(temp_range, to_olap_filter_type(fn_name), + reinterpret_cast::CppType*>(value)); } else if constexpr (is_string_type(PrimitiveType)) { - if constexpr (IsFixed) { - func(temp_range, reinterpret_cast(value)); - } else { - func(temp_range, to_olap_filter_type(fn_name), - reinterpret_cast(value)); - } + func(temp_range, to_olap_filter_type(fn_name), reinterpret_cast(value)); } else { static_assert(always_false_v); } - return Status::OK(); } template template Status ScanLocalState::_normalize_is_null_predicate( - vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, SlotDescriptor* slot, + vectorized::VExprContext* expr_ctx, const vectorized::VExprSPtr& root, SlotDescriptor* slot, std::vector>& predicates, ColumnValueRange& range, PushDownType* pdt) { std::shared_ptr pred = nullptr; @@ -919,8 +914,9 @@ Status ScanLocalState::_normalize_is_null_predicate( predicates.emplace_back(pred); } }; - auto expr = root->is_rf_wrapper() ? root->get_impl() : root; - if (auto fn_call = dynamic_cast(expr.get())) { + DCHECK(!root->is_rf_wrapper()) << root->debug_string(); + DCHECK(TExprNodeType::FUNCTION_CALL == root->node_type()) << root->debug_string(); + if (auto fn_call = dynamic_cast(root.get())) { *pdt = _should_push_down_is_null_predicate(fn_call); } else { *pdt = PushDownType::UNACCEPTABLE; @@ -930,7 +926,7 @@ Status ScanLocalState::_normalize_is_null_predicate( return Status::OK(); } - auto fn_call = assert_cast(expr.get()); + auto fn_call = assert_cast(root.get()); if (fn_call->fn().name.function_name == "is_null_pred") { pred = NullPredicate::create_shared( _parent->intermediate_row_desc().get_column_id(slot->id()), slot->col_name(), true, @@ -951,91 +947,6 @@ Status ScanLocalState::_normalize_is_null_predicate( return Status::OK(); } -template -template -Status ScanLocalState::_normalize_noneq_binary_predicate( - vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, SlotDescriptor* slot, - std::vector>& predicates, ColumnValueRange& range, - PushDownType* pdt) { - std::shared_ptr pred = nullptr; - Defer defer = [&]() { - if (pred) { - DCHECK(*pdt != PushDownType::UNACCEPTABLE) << root->debug_string(); - predicates.emplace_back(pred); - } - }; - auto expr = root->is_rf_wrapper() ? root->get_impl() : root; - if (TExprNodeType::BINARY_PRED == expr->node_type()) { - DCHECK(expr->get_num_children() == 2); - - StringRef value; - *pdt = _should_push_down_binary_predicate( - assert_cast(expr.get()), expr_ctx, &value, - {"lt", "gt", "le", "ge"}); - if (*pdt == PushDownType::UNACCEPTABLE) { - return Status::OK(); - } - const std::string& function_name = - assert_cast(expr.get())->fn().name.function_name; - - // where A = nullptr should return empty result set - if (value.data != nullptr) { - if (function_name == "lt") { - pred = create_comparison_predicate0( - _parent->intermediate_row_desc().get_column_id(slot->id()), - slot->col_name(), - slot->type()->get_primitive_type() == TYPE_VARIANT - ? expr->get_child(0)->data_type() - : slot->type(), - value, false, _arena); - } else if (function_name == "gt") { - pred = create_comparison_predicate0( - _parent->intermediate_row_desc().get_column_id(slot->id()), - slot->col_name(), - slot->type()->get_primitive_type() == TYPE_VARIANT - ? expr->get_child(0)->data_type() - : slot->type(), - value, false, _arena); - } else if (function_name == "le") { - pred = create_comparison_predicate0( - _parent->intermediate_row_desc().get_column_id(slot->id()), - slot->col_name(), - slot->type()->get_primitive_type() == TYPE_VARIANT - ? expr->get_child(0)->data_type() - : slot->type(), - value, false, _arena); - } else if (function_name == "ge") { - pred = create_comparison_predicate0( - _parent->intermediate_row_desc().get_column_id(slot->id()), - slot->col_name(), - slot->type()->get_primitive_type() == TYPE_VARIANT - ? expr->get_child(0)->data_type() - : slot->type(), - value, false, _arena); - } else { - throw Exception( - Status::InternalError("Unsupported function name: {}", function_name)); - } - if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || - T == TYPE_HLL) { - auto val = StringRef(value.data, value.size); - RETURN_IF_ERROR(_change_value_range(range, reinterpret_cast(&val), - ColumnValueRange::add_value_range, - function_name)); - } else { - RETURN_IF_ERROR(_change_value_range( - range, reinterpret_cast(value.data), - ColumnValueRange::add_value_range, function_name)); - } - } else { - *pdt = PushDownType::UNACCEPTABLE; - _eos = true; - _scan_dependency->set_ready(); - } - } - return Status::OK(); -} - template Status ScanLocalState::_prepare_scanners() { std::list scanners; diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 03f59b9c1f5a9b..10c58729eefbbe 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -218,10 +218,6 @@ class ScanLocalState : public ScanLocalStateBase { virtual PushDownType _should_push_down_in_predicate() const { return PushDownType::UNACCEPTABLE; } - virtual PushDownType _should_push_down_or_predicate( - const vectorized::VExprContext* expr_ctx) const { - return PushDownType::UNACCEPTABLE; - } virtual PushDownType _should_push_down_binary_predicate( vectorized::VectorizedFnCall* fn_call, vectorized::VExprContext* expr_ctx, StringRef* constant_val, const std::set fn_name) const { @@ -251,53 +247,47 @@ class ScanLocalState : public ScanLocalStateBase { Status _normalize_predicate(vectorized::VExprContext* context, const vectorized::VExprSPtr& root, vectorized::VExprSPtr& output_expr); + bool _is_predicate_acting_on_slot(const vectorized::VExprSPtrs& children, + SlotDescriptor** slot_desc, ColumnValueRangeType** range); Status _eval_const_conjuncts(vectorized::VExprContext* expr_ctx, PushDownType* pdt); - Status _normalize_bloom_filter(vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, - SlotDescriptor* slot, + template + Status _normalize_in_predicate(vectorized::VExprContext* expr_ctx, + const vectorized::VExprSPtr& root, SlotDescriptor* slot, + std::vector>& predicates, + ColumnValueRange& range, PushDownType* pdt); + template + Status _normalize_binary_predicate(vectorized::VExprContext* expr_ctx, + const vectorized::VExprSPtr& root, SlotDescriptor* slot, + std::vector>& predicates, + ColumnValueRange& range, PushDownType* pdt); + Status _normalize_bloom_filter(vectorized::VExprContext* expr_ctx, + const vectorized::VExprSPtr& root, SlotDescriptor* slot, std::vector>& predicates, PushDownType* pdt); - Status _normalize_topn_filter(vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, - SlotDescriptor* slot, + Status _normalize_topn_filter(vectorized::VExprContext* expr_ctx, + const vectorized::VExprSPtr& root, SlotDescriptor* slot, std::vector>& predicates, PushDownType* pdt); - Status _normalize_bitmap_filter(vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, - SlotDescriptor* slot, + Status _normalize_bitmap_filter(vectorized::VExprContext* expr_ctx, + const vectorized::VExprSPtr& root, SlotDescriptor* slot, std::vector>& predicates, PushDownType* pdt); Status _normalize_function_filters(vectorized::VExprContext* expr_ctx, SlotDescriptor* slot, PushDownType* pdt); - bool _is_predicate_acting_on_slot(const vectorized::VExprSPtrs& children, - SlotDescriptor** slot_desc, ColumnValueRangeType** range); - - template - Status _normalize_in_and_eq_predicate(vectorized::VExprContext* expr_ctx, - vectorized::VExprSPtr& root, SlotDescriptor* slot, - std::vector>& predicates, - ColumnValueRange& range, PushDownType* pdt); - template - Status _normalize_not_in_and_not_eq_predicate( - vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, SlotDescriptor* slot, - std::vector>& predicates, ColumnValueRange& range, - PushDownType* pdt); - - template - Status _normalize_noneq_binary_predicate( - vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, SlotDescriptor* slot, - std::vector>& predicates, ColumnValueRange& range, - PushDownType* pdt); template Status _normalize_is_null_predicate(vectorized::VExprContext* expr_ctx, - vectorized::VExprSPtr& root, SlotDescriptor* slot, + const vectorized::VExprSPtr& root, SlotDescriptor* slot, std::vector>& predicates, ColumnValueRange& range, PushDownType* pdt); - template - Status _change_value_range(ColumnValueRange& range, const void* value, - const ChangeFixedValueRangeFunc& func, const std::string& fn_name); + template + Status _change_value_range(bool is_equal_op, ColumnValueRange& range, + const void* value, const ChangeFixedValueRangeFunc& func, + const std::string& fn_name); Status _prepare_scanners(); diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 924ecb18914956..75cc9dc1646abc 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -195,7 +195,7 @@ class VExpr { virtual bool is_literal() const { return false; } - MOCK_FUNCTION TExprNodeType::type node_type() const { return _node_type; } + virtual TExprNodeType::type node_type() const { return _node_type; } TExprOpcode::type op() const { return _opcode; } diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h index 64ab79121d3728..4938313e44a9be 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.h +++ b/be/src/vec/exprs/vruntimefilter_wrapper.h @@ -62,6 +62,7 @@ class VRuntimeFilterWrapper final : public VExpr { void close(VExprContext* context, FunctionContext::FunctionStateScope scope) override; const std::string& expr_name() const override; const VExprSPtrs& children() const override { return _impl->children(); } + TExprNodeType::type node_type() const override { return _impl->node_type(); } uint64_t get_digest(uint64_t seed) const override { seed = _impl->get_digest(seed);