Skip to content

Commit fe9a1b2

Browse files
committed
perf(cube): Make eliminate_nested_union clone less and return Transformed::no more often
1 parent d44f2a0 commit fe9a1b2

File tree

1 file changed

+64
-23
lines changed

1 file changed

+64
-23
lines changed

datafusion/optimizer/src/eliminate_nested_union.rs

Lines changed: 64 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919
use crate::optimizer::ApplyOrder;
2020
use crate::{OptimizerConfig, OptimizerRule};
2121
use datafusion_common::tree_node::Transformed;
22-
use datafusion_common::Result;
22+
use datafusion_common::{internal_err, DFSchema, Result};
2323
use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema;
2424
use datafusion_expr::{Distinct, LogicalPlan, Union};
25-
use itertools::Itertools;
2625
use std::sync::Arc;
2726

2827
#[derive(Default, Debug)]
@@ -56,30 +55,56 @@ impl OptimizerRule for EliminateNestedUnion {
5655
) -> Result<Transformed<LogicalPlan>> {
5756
match plan {
5857
LogicalPlan::Union(Union { inputs, schema }) => {
59-
let inputs = inputs
60-
.into_iter()
61-
.flat_map(extract_plans_from_union)
62-
.map(|plan| coerce_plan_expr_for_schema(plan, &schema))
63-
.collect::<Result<Vec<_>>>()?;
58+
let mut has_subunion = false;
59+
let mut flattened_length = 0;
60+
for input in &inputs {
61+
if let LogicalPlan::Union(Union { inputs, schema: _ }) =
62+
input.as_ref()
63+
{
64+
flattened_length += inputs.len();
65+
has_subunion = true;
66+
} else {
67+
flattened_length += 1;
68+
}
69+
}
70+
if !has_subunion {
71+
return Ok(Transformed::no(LogicalPlan::Union(Union {
72+
inputs,
73+
schema,
74+
})));
75+
}
76+
77+
let mut flattened_inputs = Vec::with_capacity(flattened_length);
78+
for input in inputs {
79+
extract_plans_and_coerce_plan_expr_from_union(
80+
input,
81+
schema.as_ref(),
82+
&mut flattened_inputs,
83+
)?;
84+
}
6485

6586
Ok(Transformed::yes(LogicalPlan::Union(Union {
66-
inputs: inputs.into_iter().map(Arc::new).collect_vec(),
87+
inputs: flattened_inputs,
6788
schema,
6889
})))
6990
}
7091
LogicalPlan::Distinct(Distinct::All(nested_plan)) => {
7192
match Arc::unwrap_or_clone(nested_plan) {
7293
LogicalPlan::Union(Union { inputs, schema }) => {
73-
let inputs = inputs
74-
.into_iter()
75-
.map(extract_plan_from_distinct)
76-
.flat_map(extract_plans_from_union)
77-
.map(|plan| coerce_plan_expr_for_schema(plan, &schema))
78-
.collect::<Result<Vec<_>>>()?;
79-
94+
let mut flattened_inputs = Vec::new();
95+
for input in inputs {
96+
let input = extract_plan_from_distinct(input);
97+
extract_plans_and_coerce_plan_expr_from_union(
98+
input,
99+
schema.as_ref(),
100+
&mut flattened_inputs,
101+
)?;
102+
}
103+
104+
// Note: The top-level Union case takes care to return Transformed::no when it can, but this case still does not.
80105
Ok(Transformed::yes(LogicalPlan::Distinct(Distinct::All(
81106
Arc::new(LogicalPlan::Union(Union {
82-
inputs: inputs.into_iter().map(Arc::new).collect_vec(),
107+
inputs: flattened_inputs,
83108
schema: Arc::clone(&schema),
84109
})),
85110
))))
@@ -94,14 +119,30 @@ impl OptimizerRule for EliminateNestedUnion {
94119
}
95120
}
96121

97-
fn extract_plans_from_union(plan: Arc<LogicalPlan>) -> Vec<LogicalPlan> {
98-
match Arc::unwrap_or_clone(plan) {
99-
LogicalPlan::Union(Union { inputs, .. }) => inputs
100-
.into_iter()
101-
.map(Arc::unwrap_or_clone)
102-
.collect::<Vec<_>>(),
103-
plan => vec![plan],
122+
fn extract_plans_and_coerce_plan_expr_from_union(
123+
plan: Arc<LogicalPlan>,
124+
schema: &DFSchema,
125+
onto: &mut Vec<Arc<LogicalPlan>>,
126+
) -> Result<()> {
127+
// `plan` is a child of a Union with the Union having schema `schema`. This takes care to avoid
128+
// unnecessary plan expr coercion for children that aren't also unions.
129+
let LogicalPlan::Union(Union { .. }) = plan.as_ref() else {
130+
onto.push(plan);
131+
return Ok(());
132+
};
133+
134+
let LogicalPlan::Union(Union { inputs, .. }) = Arc::unwrap_or_clone(plan) else {
135+
return internal_err!(
136+
"plan was tested to be a LogicalPlan::Union, but it is not"
137+
);
138+
};
139+
140+
for input in inputs {
141+
let plan = Arc::unwrap_or_clone(input);
142+
let coerced_plan = Arc::new(coerce_plan_expr_for_schema(plan, schema)?);
143+
onto.push(coerced_plan);
104144
}
145+
Ok(())
105146
}
106147

107148
fn extract_plan_from_distinct(plan: Arc<LogicalPlan>) -> Arc<LogicalPlan> {

0 commit comments

Comments
 (0)