Skip to content

Commit 7e24363

Browse files
authored
fix: dist planner rm col req when rm sort (GreptimeTeam#7512)
* aha! Signed-off-by: discord9 <discord9@163.com> * fix: rm col_req in pql sort Signed-off-by: discord9 <discord9@163.com> * ut Signed-off-by: discord9 <discord9@163.com> * docs Signed-off-by: discord9 <discord9@163.com> * typo Signed-off-by: discord9 <discord9@163.com> * more typo Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com>
1 parent 3556eb4 commit 7e24363

File tree

2 files changed

+72
-6
lines changed

2 files changed

+72
-6
lines changed

src/query/src/dist_plan/analyzer.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -281,18 +281,18 @@ struct PlanRewriter {
281281
/// 2: Sort: t.pk1+t.pk2
282282
/// 3. Projection: t.number, t.pk1, t.pk2
283283
/// ```
284-
/// `Sort` will make a column requirement for `t.pk1` at level 2.
284+
/// `Sort` will make a column requirement for `t.pk1+t.pk2` at level 2.
285285
/// Which making `Projection` at level 1 need to add a ref to `t.pk1` as well.
286286
/// So that the expanded plan will be
287287
/// ```ignore
288288
/// Projection: t.number
289-
/// MergeSort: t.pk1
289+
/// MergeSort: t.pk1+t.pk2
290290
/// MergeScan: remote_input=
291291
/// Projection: t.number, "t.pk1+t.pk2" <--- the original `Projection` at level 1 get added with `t.pk1+t.pk2`
292292
/// Sort: t.pk1+t.pk2
293293
/// Projection: t.number, t.pk1, t.pk2
294294
/// ```
295-
/// Making `MergeSort` can have `t.pk1` as input.
295+
/// Making `MergeSort` can have `t.pk1+t.pk2` as input.
296296
/// Meanwhile `Projection` at level 3 doesn't need to add any new column because 3 > 2
297297
/// and col requirements at level 2 is not applicable for level 3.
298298
///
@@ -392,10 +392,11 @@ impl PlanRewriter {
392392
&& ext_b.node.name() == MergeSortLogicalPlan::name()
393393
{
394394
// revert last `ConditionalCommutative` result for Sort plan in this case.
395-
// `update_column_requirements` left unchanged because Sort won't generate
396-
// new columns or remove existing columns.
395+
// also need to remove any column requirements made by the Sort Plan
396+
// as it may refer to columns later no longer exist(rightfully) like by aggregate or projection
397397
self.stage.pop();
398398
self.expand_on_next_part_cond_trans_commutative = false;
399+
self.column_requirements.clear();
399400
}
400401
}
401402
Commutativity::PartialCommutative => {
@@ -680,6 +681,10 @@ struct EnforceDistRequirementRewriter {
680681

681682
impl EnforceDistRequirementRewriter {
682683
fn new(column_requirements: Vec<(HashSet<Column>, usize)>, cur_level: usize) -> Self {
684+
debug!(
685+
"Create EnforceDistRequirementRewriter with column_requirements: {:?} at cur_level: {}",
686+
column_requirements, cur_level
687+
);
683688
Self {
684689
column_requirements,
685690
cur_level,
@@ -733,7 +738,7 @@ impl EnforceDistRequirementRewriter {
733738
.filter(|a| !a.is_empty())
734739
else {
735740
return Err(datafusion_common::DataFusionError::Internal(format!(
736-
"EnforceDistRequirementRewriter: no alias found for required column {original_col} in child plan {child} from original plan {original}",
741+
"EnforceDistRequirementRewriter: no alias found for required column {original_col} at level {level} in current node's child plan: \n{child} from original plan: \n{original}",
737742
)));
738743
};
739744

src/query/src/dist_plan/analyzer/test.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,67 @@ fn expand_step_aggr_proj() {
777777
assert_eq!(expected, result.to_string());
778778
}
779779

780+
/// Make sure that `SeriesDivide` special handling correctly clean up column requirements from it's previous sort
781+
#[test]
782+
fn expand_complex_col_req_sort_pql() {
783+
// use logging for better debugging
784+
init_default_ut_logging();
785+
let test_table = TestTable::table_with_name(0, "t".to_string());
786+
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
787+
DfTableProviderAdapter::new(test_table),
788+
)));
789+
790+
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source.clone(), None, vec![])
791+
.unwrap()
792+
.sort(vec![
793+
col("pk1").sort(true, false),
794+
col("pk2").sort(true, false),
795+
col("pk3").sort(true, false), // make some col req here
796+
])
797+
.unwrap()
798+
.build()
799+
.unwrap();
800+
let plan = SeriesDivide::new(
801+
vec!["pk1".to_string(), "pk2".to_string(), "pk3".to_string()],
802+
"ts".to_string(),
803+
plan,
804+
);
805+
let plan = LogicalPlan::Extension(datafusion_expr::Extension {
806+
node: Arc::new(plan),
807+
});
808+
809+
let plan = LogicalPlanBuilder::from(plan)
810+
.aggregate(vec![col("pk1"), col("pk2")], vec![min(col("number"))])
811+
.unwrap()
812+
.sort(vec![
813+
col("pk1").sort(true, false),
814+
col("pk2").sort(true, false),
815+
])
816+
.unwrap()
817+
.project(vec![col("pk1"), col("pk2")])
818+
.unwrap()
819+
.build()
820+
.unwrap();
821+
822+
let config = ConfigOptions::default();
823+
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
824+
825+
let expected = [
826+
"Projection: t.pk1, t.pk2",
827+
" MergeSort: t.pk1 ASC NULLS LAST, t.pk2 ASC NULLS LAST",
828+
" MergeScan [is_placeholder=false, remote_input=[",
829+
"Projection: t.pk1, t.pk2",
830+
" Sort: t.pk1 ASC NULLS LAST, t.pk2 ASC NULLS LAST",
831+
" Aggregate: groupBy=[[t.pk1, t.pk2]], aggr=[[min(t.number)]]",
832+
r#" PromSeriesDivide: tags=["pk1", "pk2", "pk3"]"#,
833+
" Sort: t.pk1 ASC NULLS LAST, t.pk2 ASC NULLS LAST, t.pk3 ASC NULLS LAST",
834+
" TableScan: t",
835+
"]]",
836+
]
837+
.join("\n");
838+
assert_eq!(expected, result.to_string());
839+
}
840+
780841
/// should only expand `Sort`, notice `Sort` before `Aggregate` usually can and
781842
/// will be optimized out, and dist planner shouldn't handle that case, but
782843
/// for now, still handle that be expanding the `Sort` node

0 commit comments

Comments
 (0)