Skip to content

Commit 91e76f1

Browse files
sundy-liSkyFan2002zhang2014
authored
feat(query): add rule_grouping_sets_to_union (#18413)
* add MaterializedCTE plan * build pipeline * build pipeline * add operator * remove m cte temp table * bind * fix * remove unused field * fix bind * fix schema * fix * make lint * fix * fix join * fix * refine explain * fix * fix * fix * fix * fix * fix * fix * fix * CleanupUnusedCTE * fix * fix * fix * fix * refine * refine * make lint * fix * add log * fix * fix * make lint * fix * fix * fix * fix * fix * disable distributed optimization * fix merge * fix explain join * fix logic test * fix logic test * add ref count * refactor: streaming CTE consumption * refactor plan * fix * fix * enable distributed * fix logic test * fix serial cte * fix test * fix fragment type * fix replace range join * fix explain join order * fix logic test * feat(query): add rule_grouping_sets_to_union * feat(query): add rule_grouping_sets_to_union * simplify * ref_count calculation is not required when constructing MaterializedCTE * Merge fb * simplify MaterializedCTE * simplify CTEConsumer * Merge * Update src/query/service/src/pipelines/builders/builder_sequence.rs Co-authored-by: Winter Zhang <[email protected]> * Update src/query/service/src/pipelines/builders/builder_materialized_cte.rs Co-authored-by: Winter Zhang <[email protected]> * make lint * rename CTEConsumer to MaterializeCTERef * Update src/query/sql/src/planner/optimizer/optimizers/operator/cte/cleanup_unused_cte.rs Co-authored-by: Winter Zhang <[email protected]> * Merge * Merge * add channel size config * Merge * Merge * update * update --------- Co-authored-by: sky <[email protected]> Co-authored-by: Winter Zhang <[email protected]>
1 parent 6f0ae5b commit 91e76f1

File tree

14 files changed

+504
-22
lines changed

14 files changed

+504
-22
lines changed

src/query/service/src/pipelines/builders/builder_union_all.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ impl PipelineBuilder {
4242
self.main_pipeline.extend_sinks(left_sinks);
4343
self.main_pipeline.extend_sinks(right_sinks);
4444

45-
match self.ctx.get_settings().get_enable_parallel_union_all()? {
45+
let enable_parallel_union_all = self.ctx.get_settings().get_enable_parallel_union_all()?
46+
|| self.ctx.get_settings().get_grouping_sets_to_union()?;
47+
match enable_parallel_union_all {
4648
true => self.main_pipeline.resize(outputs, false),
4749
false => self.main_pipeline.sequence_group(sequence_groups, outputs),
4850
}

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,13 @@ impl DefaultSettings {
513513
scope: SettingScope::Both,
514514
range: Some(SettingRange::Numeric(0..=1)),
515515
}),
516+
("grouping_sets_to_union", DefaultSettingValue {
517+
value: UserSettingValue::UInt64(0),
518+
desc: "Enables grouping sets to union.",
519+
mode: SettingMode::Both,
520+
scope: SettingScope::Both,
521+
range: Some(SettingRange::Numeric(0..=1)),
522+
}),
516523
("storage_fetch_part_num", DefaultSettingValue {
517524
value: UserSettingValue::UInt64(2),
518525
desc: "Sets the number of partitions that are fetched in parallel from storage during query execution.",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,10 @@ impl Settings {
516516
self.try_get_string("group_by_shuffle_mode")
517517
}
518518

519+
pub fn get_grouping_sets_to_union(&self) -> Result<bool> {
520+
Ok(self.try_get_u64("grouping_sets_to_union")? == 1)
521+
}
522+
519523
pub fn get_efficiently_memory_group_by(&self) -> Result<bool> {
520524
Ok(self.try_get_u64("efficiently_memory_group_by")? == 1)
521525
}

src/query/sql/src/executor/physical_plans/physical_aggregate_final.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,13 @@ impl PhysicalPlanBuilder {
232232
settings.get_enable_experimental_aggregate_hashtable()?;
233233

234234
if let Some(grouping_sets) = agg.grouping_sets.as_ref() {
235-
assert_eq!(grouping_sets.dup_group_items.len(), group_items.len() - 1); // ignore `_grouping_id`.
236-
// If the aggregation function argument if a group item,
237-
// we cannot use the group item directly.
238-
// It's because the group item will be wrapped with nullable and fill dummy NULLs (in `AggregateExpand` plan),
239-
// which will cause panic while executing aggregation function.
240-
// To avoid the panic, we will duplicate (`Arc::clone`) original group item columns in `AggregateExpand`,
241-
// we should use these columns instead.
235+
// ignore `_grouping_id`.
236+
// If the aggregation function argument if a group item,
237+
// we cannot use the group item directly.
238+
// It's because the group item will be wrapped with nullable and fill dummy NULLs (in `AggregateExpand` plan),
239+
// which will cause panic while executing aggregation function.
240+
// To avoid the panic, we will duplicate (`Arc::clone`) original group item columns in `AggregateExpand`,
241+
// we should use these columns instead.
242242
for func in agg_funcs.iter_mut() {
243243
for arg in func.arg_indices.iter_mut() {
244244
if let Some(pos) = group_items.iter().position(|g| g == arg) {
@@ -480,7 +480,6 @@ impl PhysicalPlanBuilder {
480480

481481
if let Some(grouping_sets) = agg.grouping_sets.as_ref() {
482482
// The argument types are wrapped nullable due to `AggregateExpand` plan. We should recover them to original types.
483-
assert_eq!(grouping_sets.dup_group_items.len(), group_items.len() - 1); // ignore `_grouping_id`.
484483
for func in agg_funcs.iter_mut() {
485484
for (arg, ty) in func.arg_indices.iter_mut().zip(func.sig.args.iter_mut()) {
486485
if let Some(pos) = group_items.iter().position(|g| g == arg) {

src/query/sql/src/planner/binder/aggregate.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,7 @@ impl Binder {
771771
);
772772
dup_group_items.push((dummy.index, *dummy.data_type));
773773
}
774+
774775
// Add a virtual column `_grouping_id` to group items.
775776
let grouping_id_column = self.create_derived_column_binding(
776777
"_grouping_id".to_string(),
@@ -783,14 +784,16 @@ impl Binder {
783784
column: grouping_id_column.clone(),
784785
};
785786

786-
agg_info.group_items_map.insert(
787-
bound_grouping_id_col.clone().into(),
788-
agg_info.group_items.len(),
789-
);
790-
agg_info.group_items.push(ScalarItem {
791-
index: grouping_id_column.index,
792-
scalar: bound_grouping_id_col.into(),
793-
});
787+
if !self.ctx.get_settings().get_grouping_sets_to_union()? {
788+
agg_info.group_items_map.insert(
789+
bound_grouping_id_col.clone().into(),
790+
agg_info.group_items.len(),
791+
);
792+
agg_info.group_items.push(ScalarItem {
793+
index: grouping_id_column.index,
794+
scalar: bound_grouping_id_col.into(),
795+
});
796+
}
794797

795798
let grouping_sets_info = GroupingSetsInfo {
796799
grouping_id_column,

src/query/sql/src/planner/optimizer/optimizer_context.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use databend_common_catalog::table_context::TableContext;
1919
use educe::Educe;
2020
use parking_lot::RwLock;
2121

22+
use crate::optimizer::optimizers::rule::RuleID;
2223
use crate::planner::QueryExecutor;
2324
use crate::MetadataRef;
2425

@@ -152,6 +153,13 @@ impl OptimizerContext {
152153
/// Check if an optimizer or rule is disabled based on optimizer_skip_list setting
153154
pub fn is_optimizer_disabled(self: &Arc<Self>, name: &str) -> bool {
154155
let settings = self.get_table_ctx().get_settings();
156+
157+
if !settings.get_grouping_sets_to_union().unwrap_or_default()
158+
&& name == RuleID::GroupingSetsToUnion.to_string()
159+
{
160+
return true;
161+
}
162+
155163
match settings.get_optimizer_skip_list() {
156164
Ok(skip_list) if !skip_list.is_empty() => {
157165
let name_lower = name.to_lowercase();

src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
mod agg_index;
1616
mod rule_eager_aggregation;
1717
mod rule_fold_count_aggregate;
18+
mod rule_grouping_sets_to_union;
1819
mod rule_push_down_filter_aggregate;
1920
mod rule_push_down_limit_aggregate;
2021
mod rule_split_aggregate;
2122
mod rule_try_apply_agg_index;
2223

2324
pub use rule_eager_aggregation::RuleEagerAggregation;
2425
pub use rule_fold_count_aggregate::RuleFoldCountAggregate;
26+
pub use rule_grouping_sets_to_union::RuleGroupingSetsToUnion;
2527
pub use rule_push_down_filter_aggregate::RulePushDownFilterAggregate;
2628
pub use rule_push_down_limit_aggregate::RulePushDownRankLimitAggregate;
2729
pub use rule_split_aggregate::RuleSplitAggregate;
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::hash::DefaultHasher;
16+
use std::hash::Hash;
17+
use std::hash::Hasher;
18+
use std::sync::Arc;
19+
20+
use databend_common_exception::Result;
21+
use databend_common_expression::types::NumberScalar;
22+
use databend_common_expression::Scalar;
23+
24+
use crate::optimizer::ir::Matcher;
25+
use crate::optimizer::ir::RelExpr;
26+
use crate::optimizer::ir::SExpr;
27+
use crate::optimizer::optimizers::rule::Rule;
28+
use crate::optimizer::optimizers::rule::RuleID;
29+
use crate::optimizer::optimizers::rule::TransformResult;
30+
use crate::plans::walk_expr_mut;
31+
use crate::plans::Aggregate;
32+
use crate::plans::AggregateMode;
33+
use crate::plans::CastExpr;
34+
use crate::plans::ConstantExpr;
35+
use crate::plans::EvalScalar;
36+
use crate::plans::MaterializeCTERef;
37+
use crate::plans::MaterializedCTE;
38+
use crate::plans::RelOp;
39+
use crate::plans::Sequence;
40+
use crate::plans::UnionAll;
41+
use crate::plans::VisitorMut;
42+
use crate::IndexType;
43+
use crate::ScalarExpr;
44+
45+
// TODO
46+
const ID: RuleID = RuleID::GroupingSetsToUnion;
47+
// Split `Grouping Sets` into `Union All` of `Group by`
48+
// Eg:
49+
// select number % 10 AS a, number % 3 AS b, number % 4 AS c
50+
// from numbers(100000000)
51+
// group by grouping sets((a,b),(a,c));
52+
53+
// INTO:
54+
55+
// select number % 10 AS a, number % 3 AS b, number % 4 AS c
56+
// from numbers(100000000)
57+
// group by a,b
58+
// union all
59+
// select number % 10 AS a, number % 3 AS b, number % 4 AS c
60+
// from numbers(100000000)
61+
// group by a,c
62+
//
63+
pub struct RuleGroupingSetsToUnion {
64+
id: RuleID,
65+
matchers: Vec<Matcher>,
66+
}
67+
68+
impl RuleGroupingSetsToUnion {
69+
pub fn new() -> Self {
70+
Self {
71+
id: ID,
72+
// Aggregate
73+
// \
74+
// *
75+
matchers: vec![Matcher::MatchOp {
76+
op_type: RelOp::EvalScalar,
77+
children: vec![Matcher::MatchOp {
78+
op_type: RelOp::Aggregate,
79+
children: vec![Matcher::Leaf],
80+
}],
81+
}],
82+
}
83+
}
84+
}
85+
86+
// Must go before `RuleSplitAggregate`
87+
impl Rule for RuleGroupingSetsToUnion {
88+
fn id(&self) -> RuleID {
89+
self.id
90+
}
91+
92+
fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
93+
let eval_scalar: EvalScalar = s_expr.plan().clone().try_into()?;
94+
let agg: Aggregate = s_expr.child(0)?.plan().clone().try_into()?;
95+
if agg.mode != AggregateMode::Initial {
96+
return Ok(());
97+
}
98+
99+
let agg_input = s_expr.child(0)?.child(0)?;
100+
let agg_input_columns: Vec<IndexType> = RelExpr::with_s_expr(agg_input)
101+
.derive_relational_prop()?
102+
.output_columns
103+
.iter()
104+
.cloned()
105+
.collect();
106+
107+
if let Some(grouping_sets) = &agg.grouping_sets {
108+
if !grouping_sets.sets.is_empty() {
109+
let mut children = Vec::with_capacity(grouping_sets.sets.len());
110+
111+
let mut hasher = DefaultHasher::new();
112+
agg.grouping_sets.hash(&mut hasher);
113+
let hash = hasher.finish();
114+
let temp_cte_name = format!("cte_groupingsets_{hash}");
115+
116+
let cte_materialized_sexpr = SExpr::create_unary(
117+
MaterializedCTE::new(temp_cte_name.clone(), None, Some(1)),
118+
agg_input.clone(),
119+
);
120+
121+
let cte_consumer = SExpr::create_leaf(MaterializeCTERef {
122+
cte_name: temp_cte_name,
123+
output_columns: agg_input_columns.clone(),
124+
def: agg_input.clone(),
125+
});
126+
127+
let mask = (1 << grouping_sets.dup_group_items.len()) - 1;
128+
let group_bys = agg
129+
.group_items
130+
.iter()
131+
.map(|i| {
132+
agg_input_columns
133+
.iter()
134+
.position(|t| *t == i.index)
135+
.unwrap()
136+
})
137+
.collect::<Vec<_>>();
138+
139+
for set in &grouping_sets.sets {
140+
let mut id = 0;
141+
142+
// For element in `group_bys`,
143+
// if it is in current grouping set: set 0, else: set 1. (1 represents it will be NULL in grouping)
144+
// Example: GROUP BY GROUPING SETS ((a, b), (a), (b), ())
145+
// group_bys: [a, b]
146+
// grouping_sets: [[0, 1], [0], [1], []]
147+
// grouping_ids: 00, 01, 10, 11
148+
149+
for g in set {
150+
let i = group_bys.iter().position(|t| *t == *g).unwrap();
151+
id |= 1 << i;
152+
}
153+
let grouping_id = !id & mask;
154+
155+
let mut eval_scalar = eval_scalar.clone();
156+
let mut agg = agg.clone();
157+
agg.grouping_sets = None;
158+
159+
let null_group_ids: Vec<IndexType> = agg
160+
.group_items
161+
.iter()
162+
.map(|i| i.index)
163+
.filter(|index| !set.contains(index))
164+
.clone()
165+
.collect();
166+
167+
agg.group_items.retain(|x| set.contains(&x.index));
168+
let group_ids: Vec<IndexType> =
169+
agg.group_items.iter().map(|i| i.index).collect();
170+
171+
let mut visitor = ReplaceColumnForGroupingSetsVisitor {
172+
group_indexes: group_ids,
173+
exclude_group_indexes: null_group_ids,
174+
grouping_id_index: grouping_sets.grouping_id_index,
175+
grouping_id_value: grouping_id,
176+
};
177+
178+
for scalar in eval_scalar.items.iter_mut() {
179+
visitor.visit(&mut scalar.scalar)?;
180+
}
181+
182+
let agg_plan = SExpr::create_unary(agg, cte_consumer.clone());
183+
let eval_plan = SExpr::create_unary(eval_scalar, agg_plan);
184+
children.push(eval_plan);
185+
}
186+
187+
// fold children into result
188+
let mut result = children.first().unwrap().clone();
189+
for other in children.into_iter().skip(1) {
190+
let left_outputs: Vec<(IndexType, Option<ScalarExpr>)> =
191+
eval_scalar.items.iter().map(|x| (x.index, None)).collect();
192+
let right_outputs = left_outputs.clone();
193+
194+
let union_plan = UnionAll {
195+
left_outputs,
196+
right_outputs,
197+
cte_scan_names: vec![],
198+
output_indexes: eval_scalar.items.iter().map(|x| x.index).collect(),
199+
};
200+
result = SExpr::create_binary(Arc::new(union_plan.into()), result, other);
201+
}
202+
result = SExpr::create_binary(Sequence, cte_materialized_sexpr, result);
203+
state.add_result(result);
204+
return Ok(());
205+
}
206+
}
207+
Ok(())
208+
}
209+
210+
fn matchers(&self) -> &[Matcher] {
211+
&self.matchers
212+
}
213+
}
214+
215+
impl Default for RuleGroupingSetsToUnion {
216+
fn default() -> Self {
217+
Self::new()
218+
}
219+
}
220+
221+
struct ReplaceColumnForGroupingSetsVisitor {
222+
group_indexes: Vec<IndexType>,
223+
exclude_group_indexes: Vec<IndexType>,
224+
grouping_id_index: IndexType,
225+
grouping_id_value: u32,
226+
}
227+
228+
impl VisitorMut<'_> for ReplaceColumnForGroupingSetsVisitor {
229+
fn visit(&mut self, expr: &mut ScalarExpr) -> Result<()> {
230+
let old = expr.clone();
231+
232+
if let ScalarExpr::BoundColumnRef(col) = expr {
233+
if self.group_indexes.contains(&col.column.index) {
234+
*expr = ScalarExpr::CastExpr(CastExpr {
235+
argument: Box::new(old),
236+
is_try: true,
237+
target_type: Box::new(col.column.data_type.wrap_nullable()),
238+
span: col.span,
239+
});
240+
} else if self.exclude_group_indexes.contains(&col.column.index) {
241+
*expr = ScalarExpr::TypedConstantExpr(
242+
ConstantExpr {
243+
value: Scalar::Null,
244+
span: col.span,
245+
},
246+
col.column.data_type.wrap_nullable(),
247+
);
248+
} else if self.grouping_id_index == col.column.index {
249+
*expr = ScalarExpr::ConstantExpr(ConstantExpr {
250+
value: Scalar::Number(NumberScalar::UInt32(self.grouping_id_value)),
251+
span: col.span,
252+
});
253+
}
254+
return Ok(());
255+
}
256+
walk_expr_mut(self, expr)
257+
}
258+
}

0 commit comments

Comments
 (0)