Skip to content

Commit 501a5a4

Browse files
authored
feat: Add RuleFilterFlattenOr (#18621)
* feat: Add `RuleFilterFlattenOr` is used to fold the outer `or` of the predicate in Filter into `or_filters` * chore: codefmt * fix: fix `or_filters` compatibility with statistics calculation and optimization rules * chore: codefmt * chore: codefmt * chore: codefmt
1 parent a57986d commit 501a5a4

File tree

18 files changed

+257
-46
lines changed

18 files changed

+257
-46
lines changed

src/query/expression/src/expression.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -771,9 +771,9 @@ impl<Index: ColumnIndex> Expr<Index> {
771771
}
772772
}
773773

774-
pub fn visit_func(&self, func_name: &str, visitor: &mut impl FnMut(&FunctionCall<Index>)) {
774+
pub fn visit_func(&self, func_names: &[&str], visitor: &mut impl FnMut(&FunctionCall<Index>)) {
775775
struct Visitor<'a, Index: ColumnIndex, F: FnMut(&FunctionCall<Index>)> {
776-
name: String,
776+
names: &'a [&'a str],
777777
visitor: &'a mut F,
778778
_marker: std::marker::PhantomData<Index>,
779779
}
@@ -785,15 +785,15 @@ impl<Index: ColumnIndex> Expr<Index> {
785785
&mut self,
786786
call: &FunctionCall<Index>,
787787
) -> Result<Option<Expr<Index>>, Self::Error> {
788-
if call.function.signature.name == self.name {
788+
if self.names.contains(&call.function.signature.name.as_str()) {
789789
(self.visitor)(call);
790790
}
791791
Self::visit_function_call(call, self)
792792
}
793793
}
794794

795795
visit_expr(self, &mut Visitor {
796-
name: func_name.to_string(),
796+
names: func_names,
797797
visitor,
798798
_marker: std::marker::PhantomData,
799799
})

src/query/expression/src/utils/filter_helper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl FilterHelpers {
113113
// for the equality columns set,let's call `ecs`
114114
// if any side of `or` of the name columns is not in `ecs`, it's valid
115115
// otherwise, it's invalid
116-
expr.visit_func("or", &mut |call| {
116+
expr.visit_func(&["or", "or_filters"], &mut |call| {
117117
for arg in call.args.iter() {
118118
let mut ecs = HashSet::new();
119119
arg.find_function_literals("eq", &mut |col_name, _scalar, _| {

src/query/sql/src/planner/binder/scalar_common.rs

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,19 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::borrow::Cow;
1516
use std::collections::HashSet;
1617

1718
use databend_common_exception::Result;
1819
use databend_common_expression::types::DataType;
20+
use databend_common_expression::Scalar;
1921

2022
use crate::optimizer::ir::RelationalProperty;
2123
use crate::plans::walk_expr;
2224
use crate::plans::BoundColumnRef;
2325
use crate::plans::CastExpr;
26+
use crate::plans::ConstantExpr;
27+
use crate::plans::FunctionCall;
2428
use crate::plans::ScalarExpr;
2529
use crate::plans::Visitor;
2630

@@ -111,13 +115,32 @@ pub enum JoinPredicate<'a> {
111115
Left(&'a ScalarExpr),
112116
Right(&'a ScalarExpr),
113117
Both {
114-
left: &'a ScalarExpr,
115-
right: &'a ScalarExpr,
118+
left: Box<Cow<'a, ScalarExpr>>,
119+
right: Box<Cow<'a, ScalarExpr>>,
116120
is_equal_op: bool,
117121
},
118122
Other(&'a ScalarExpr),
119123
}
120124

125+
fn fold_or_arguments(iter: impl Iterator<Item = ScalarExpr>) -> ScalarExpr {
126+
iter.fold(
127+
ConstantExpr {
128+
span: None,
129+
value: Scalar::Boolean(false),
130+
}
131+
.into(),
132+
|acc, arg| {
133+
FunctionCall {
134+
span: None,
135+
func_name: "or".to_string(),
136+
params: vec![],
137+
arguments: vec![acc, arg.clone()],
138+
}
139+
.into()
140+
},
141+
)
142+
}
143+
121144
impl<'a> JoinPredicate<'a> {
122145
pub fn new(
123146
scalar: &'a ScalarExpr,
@@ -128,7 +151,35 @@ impl<'a> JoinPredicate<'a> {
128151
return Self::ALL(scalar);
129152
}
130153

154+
if satisfied_by(scalar, left_prop) {
155+
return Self::Left(scalar);
156+
}
157+
158+
if satisfied_by(scalar, right_prop) {
159+
return Self::Right(scalar);
160+
}
161+
131162
if let ScalarExpr::FunctionCall(func) = scalar {
163+
if func.func_name == "or_filters" && func.arguments.len() > 1 {
164+
let mut left_exprs = Vec::new();
165+
let mut right_exprs = Vec::new();
166+
167+
for expr in func.arguments.iter() {
168+
if satisfied_by(expr, left_prop) {
169+
left_exprs.push(expr.clone());
170+
} else if satisfied_by(expr, right_prop) {
171+
right_exprs.push(expr.clone());
172+
} else {
173+
return Self::Other(scalar);
174+
}
175+
}
176+
return Self::Both {
177+
left: Box::new(Cow::Owned(fold_or_arguments(left_exprs.into_iter()))),
178+
right: Box::new(Cow::Owned(fold_or_arguments(right_exprs.into_iter()))),
179+
is_equal_op: false,
180+
};
181+
}
182+
132183
if func.arguments.len() > 2 {
133184
return Self::Other(scalar);
134185
}
@@ -140,30 +191,22 @@ impl<'a> JoinPredicate<'a> {
140191

141192
if satisfied_by(left, left_prop) && satisfied_by(right, right_prop) {
142193
return Self::Both {
143-
left,
144-
right,
194+
left: Box::new(Cow::Borrowed(left)),
195+
right: Box::new(Cow::Borrowed(right)),
145196
is_equal_op,
146197
};
147198
}
148199

149200
if satisfied_by(right, left_prop) && satisfied_by(left, right_prop) {
150201
return Self::Both {
151-
left: right,
152-
right: left,
202+
left: Box::new(Cow::Borrowed(right)),
203+
right: Box::new(Cow::Borrowed(left)),
153204
is_equal_op,
154205
};
155206
}
156207
}
157208
}
158209

159-
if satisfied_by(scalar, left_prop) {
160-
return Self::Left(scalar);
161-
}
162-
163-
if satisfied_by(scalar, right_prop) {
164-
return Self::Right(scalar);
165-
}
166-
167210
Self::Other(scalar)
168211
}
169212
}

src/query/sql/src/planner/optimizer/ir/stats/selectivity.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,16 @@ impl<'a> SelectivityEstimator<'a> {
9797
left_selectivity.min(right_selectivity)
9898
}
9999

100-
ScalarExpr::FunctionCall(func) if func.func_name == "or" => {
101-
let left_selectivity = self.compute_selectivity(&func.arguments[0], false)?;
102-
let right_selectivity = self.compute_selectivity(&func.arguments[1], false)?;
103-
left_selectivity + right_selectivity - left_selectivity * right_selectivity
100+
ScalarExpr::FunctionCall(func)
101+
if matches!(func.func_name.as_str(), "or" | "or_filters") =>
102+
{
103+
func.arguments
104+
.iter()
105+
.map(|arg| self.compute_selectivity(arg, false))
106+
.try_fold(0.0, |acc, p| {
107+
let p = p?;
108+
Result::Ok(acc + p - acc * p)
109+
})?
104110
}
105111

106112
ScalarExpr::FunctionCall(func) if func.func_name == "not" => {

src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/decorrelate.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ impl SubqueryDecorrelatorOptimizer {
156156
..
157157
} => {
158158
if is_equal_op {
159-
left_conditions.push(left.clone());
160-
right_conditions.push(right.clone());
159+
left_conditions.push(left.into_owned());
160+
right_conditions.push(right.into_owned());
161161
} else {
162162
non_equi_conditions.push(pred.clone());
163163
}

src/query/sql/src/planner/optimizer/optimizers/operator/filter/normalize_disjunctive_filter.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ fn predicate_scalar(scalar: &ScalarExpr) -> PredicateScalar {
7979
}
8080
PredicateScalar::And(and_args)
8181
}
82-
ScalarExpr::FunctionCall(func) if func.func_name == "or" => {
82+
ScalarExpr::FunctionCall(func)
83+
if matches!(func.func_name.as_str(), "or" | "or_filters") =>
84+
{
8385
let mut or_args = vec![];
8486
for argument in func.arguments.iter() {
8587
// Recursively flatten the OR expressions.

src/query/sql/src/planner/optimizer/optimizers/rule/factory.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::optimizer::optimizers::rule::RuleEliminateEvalScalar;
2424
use crate::optimizer::optimizers::rule::RuleEliminateFilter;
2525
use crate::optimizer::optimizers::rule::RuleEliminateSort;
2626
use crate::optimizer::optimizers::rule::RuleEliminateUnion;
27+
use crate::optimizer::optimizers::rule::RuleFilterFlattenOr;
2728
use crate::optimizer::optimizers::rule::RuleFilterNulls;
2829
use crate::optimizer::optimizers::rule::RuleFoldCountAggregate;
2930
use crate::optimizer::optimizers::rule::RuleGroupingSetsToUnion;
@@ -73,6 +74,7 @@ impl RuleFactory {
7374
RuleID::FilterNulls => Ok(Box::new(RuleFilterNulls::new(
7475
ctx.get_enable_distributed_optimization(),
7576
))),
77+
RuleID::FilterFlattenOr => Ok(Box::new(RuleFilterFlattenOr::new())),
7678
RuleID::PushDownFilterUnion => Ok(Box::new(RulePushDownFilterUnion::new())),
7779
RuleID::PushDownFilterEvalScalar => Ok(Box::new(RulePushDownFilterEvalScalar::new())),
7880
RuleID::PushDownFilterJoin => Ok(Box::new(RulePushDownFilterJoin::new(metadata))),

src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
mod rule_eliminate_filter;
16+
mod rule_filter_flatten_or;
1617
mod rule_filter_nulls;
1718
mod rule_merge_filter;
1819
mod rule_merge_filter_into_mutation;
@@ -30,6 +31,7 @@ mod rule_push_down_sort_filter_scan;
3031
mod rule_push_down_sort_scan;
3132

3233
pub use rule_eliminate_filter::RuleEliminateFilter;
34+
pub use rule_filter_flatten_or::RuleFilterFlattenOr;
3335
pub use rule_filter_nulls::RuleFilterNulls;
3436
pub use rule_merge_filter::RuleMergeFilter;
3537
pub use rule_merge_filter_into_mutation::RuleMergeFilterIntoMutation;
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_exception::Result;
18+
use databend_common_expression::Scalar;
19+
20+
use crate::optimizer::ir::Matcher;
21+
use crate::optimizer::ir::SExpr;
22+
use crate::optimizer::optimizers::rule::Rule;
23+
use crate::optimizer::optimizers::rule::RuleID;
24+
use crate::optimizer::optimizers::rule::TransformResult;
25+
use crate::plans::ConstantExpr;
26+
use crate::plans::Filter;
27+
use crate::plans::FunctionCall;
28+
use crate::plans::RelOp;
29+
use crate::plans::ScalarExpr;
30+
31+
pub struct RuleFilterFlattenOr {
32+
id: RuleID,
33+
matchers: Vec<Matcher>,
34+
}
35+
36+
impl RuleFilterFlattenOr {
37+
pub fn new() -> Self {
38+
Self {
39+
id: RuleID::FilterFlattenOr,
40+
// Filter
41+
// \
42+
// *
43+
matchers: vec![Matcher::MatchOp {
44+
op_type: RelOp::Filter,
45+
children: vec![Matcher::Leaf],
46+
}],
47+
}
48+
}
49+
}
50+
51+
impl Default for RuleFilterFlattenOr {
52+
fn default() -> Self {
53+
Self::new()
54+
}
55+
}
56+
57+
impl Rule for RuleFilterFlattenOr {
58+
fn id(&self) -> RuleID {
59+
self.id
60+
}
61+
62+
fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
63+
let mut filter: Filter = s_expr.plan().clone().try_into()?;
64+
let mut has_replace = false;
65+
66+
for predicate in filter.predicates.iter_mut() {
67+
let mut or_exprs = Vec::new();
68+
flatten_or_expr(predicate, &mut or_exprs);
69+
70+
if or_exprs.len() > 2 {
71+
let replace_expr = FunctionCall {
72+
span: None,
73+
func_name: "or_filters".to_string(),
74+
params: vec![],
75+
arguments: or_exprs,
76+
}
77+
.into();
78+
if predicate == &replace_expr {
79+
continue;
80+
}
81+
*predicate = replace_expr;
82+
has_replace = true
83+
}
84+
}
85+
if !has_replace {
86+
state.add_result(s_expr.clone());
87+
return Ok(());
88+
}
89+
let mut res =
90+
SExpr::create_unary(Arc::new(filter.into()), Arc::new(s_expr.child(0)?.clone()));
91+
res.set_applied_rule(&self.id());
92+
state.add_result(res);
93+
94+
Ok(())
95+
}
96+
97+
fn matchers(&self) -> &[Matcher] {
98+
&self.matchers
99+
}
100+
}
101+
102+
fn flatten_or_expr(expr: &ScalarExpr, or_exprs: &mut Vec<ScalarExpr>) {
103+
match expr {
104+
ScalarExpr::FunctionCall(func)
105+
if matches!(func.func_name.as_str(), "or" | "or_filters") =>
106+
{
107+
for argument in func.arguments.iter() {
108+
flatten_or_expr(argument, or_exprs);
109+
}
110+
}
111+
ScalarExpr::ConstantExpr(ConstantExpr { value, span }) if value.is_null() => {
112+
// predicates cannot directly pass null
113+
or_exprs.push(
114+
ConstantExpr {
115+
span: *span,
116+
value: Scalar::Boolean(false),
117+
}
118+
.into(),
119+
)
120+
}
121+
_ => or_exprs.push(expr.clone()),
122+
}
123+
}

src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_push_down_filter_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,8 @@ pub fn try_push_down_filter_join(s_expr: &SExpr, metadata: MetadataRef) -> Resul
267267
}
268268
JoinPredicate::Both { left, right, .. } => {
269269
join.equi_conditions.push(JoinEquiCondition::new(
270-
left.clone(),
271-
right.clone(),
270+
left.into_owned(),
271+
right.into_owned(),
272272
false,
273273
));
274274
}

0 commit comments

Comments
 (0)