Skip to content

Commit e4a1574

Browse files
committed
chore(cubestore): Upgrade DF: Pretty-print sort column improvements for MergeSort and TopK Aggregates
1 parent 461a5d0 commit e4a1574

File tree

4 files changed

+34
-27
lines changed

4 files changed

+34
-27
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3403,13 +3403,13 @@ async fn planning_inplace_aggregate2(service: Box<dyn SqlClient>) {
34033403
assert_eq!(
34043404
pp_phys_plan_ext(p.router.as_ref(), &verbose),
34053405
"Projection, [url, sum(Data.hits)@1:hits]\
3406-
\n AggregateTopK, limit: 10, sortBy: [2 desc null last]\
3406+
\n AggregateTopK, limit: 10, sortBy: [2 desc nulls last]\
34073407
\n ClusterSend, partitions: [[1, 2]], sort_order: [1]"
34083408
);
34093409
assert_eq!(
34103410
pp_phys_plan_ext(p.worker.as_ref(), &verbose),
34113411
"Projection, [url, sum(Data.hits)@1:hits]\
3412-
\n AggregateTopK, limit: 10, sortBy: [2 desc null last]\
3412+
\n AggregateTopK, limit: 10, sortBy: [2 desc nulls last]\
34133413
\n Worker, sort_order: [1]\
34143414
\n Sort, by: [sum(Data.hits)@1 desc nulls last], sort_order: [1]\
34153415
\n LinearSingleAggregate\

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2229,7 +2229,7 @@ pub mod tests {
22292229
assert_eq!(
22302230
pretty_printers::pp_plan_ext(&plan, &with_sort_by),
22312231
"Projection, [amount, customer]\
2232-
\n ClusterAggregateTopK, limit: 10, sortBy: [2 desc null last]\
2232+
\n ClusterAggregateTopK, limit: 10, sortBy: [2 desc nulls last]\
22332233
\n Scan s.Orders, source: CubeTable(index: by_customer:3:[]:sort_on[order_customer]), fields: [order_customer, order_amount]"
22342234
);
22352235

@@ -2243,7 +2243,7 @@ pub mod tests {
22432243
assert_eq!(
22442244
pretty_printers::pp_plan_ext(&plan, &with_sort_by),
22452245
"Projection, [customer, amount]\
2246-
\n ClusterAggregateTopK, limit: 10, sortBy: [2 null last]\
2246+
\n ClusterAggregateTopK, limit: 10, sortBy: [2 nulls last]\
22472247
\n Scan s.Orders, source: CubeTable(index: by_customer:3:[]:sort_on[order_customer]), fields: [order_customer, order_amount]"
22482248
);
22492249

@@ -2261,7 +2261,7 @@ pub mod tests {
22612261
assert_eq!(
22622262
pretty_printers::pp_plan_ext(&plan, &verbose),
22632263
"Projection, [customer, amount, min_amount, max_amount]\
2264-
\n ClusterAggregateTopK, limit: 10, aggs: [sum(s.Orders.order_amount), min(s.Orders.order_amount), max(s.Orders.order_amount)], sortBy: [3 desc null last, 2 null last]\
2264+
\n ClusterAggregateTopK, limit: 10, aggs: [sum(s.Orders.order_amount), min(s.Orders.order_amount), max(s.Orders.order_amount)], sortBy: [3 desc nulls last, 2 nulls last]\
22652265
\n Scan s.Orders, source: CubeTable(index: by_customer:3:[]:sort_on[order_customer]), fields: [order_customer, order_amount]"
22662266
);
22672267

rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use datafusion::logical_expr::{
1111
Aggregate, EmptyRelation, Explain, Extension, FetchType, Filter, Join, Limit, LogicalPlan,
1212
Projection, Repartition, SkipType, Sort, TableScan, Union, Window,
1313
};
14-
use datafusion::physical_expr::{AcrossPartitions, ConstExpr};
14+
use datafusion::physical_expr::{AcrossPartitions, ConstExpr, LexOrdering};
1515
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
1616
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
1717
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -479,14 +479,34 @@ pub fn pp_sort_columns(first_agg: usize, cs: &[SortColumn]) -> String {
479479
r += " desc";
480480
}
481481
if !c.nulls_first {
482-
r += " null last";
482+
r += " nulls last";
483483
}
484484
r
485485
})
486486
.join(", ")
487487
)
488488
}
489489

490+
fn pp_append_sort_by(out: &mut String, ordering: &LexOrdering) {
491+
let _ = write!(
492+
out,
493+
", by: [{}]",
494+
ordering
495+
.iter()
496+
.map(|e| {
497+
let mut r = format!("{}", e.expr);
498+
if e.options.descending {
499+
r += " desc";
500+
}
501+
if !e.options.nulls_first {
502+
r += " nulls last";
503+
}
504+
r
505+
})
506+
.join(", "),
507+
);
508+
}
509+
490510
fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, out: &mut String) {
491511
if p.as_any().is::<CheckMemoryExec>() && !o.show_check_memory_nodes {
492512
//We don't show CheckMemoryExec in plan by default
@@ -588,23 +608,9 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
588608
}
589609
} else if let Some(s) = a.downcast_ref::<SortExec>() {
590610
*out += "Sort";
611+
591612
if o.show_sort_by {
592-
*out += &format!(
593-
", by: [{}]",
594-
s.expr()
595-
.iter()
596-
.map(|e| {
597-
let mut r = format!("{}", e.expr);
598-
if e.options.descending {
599-
r += " desc";
600-
}
601-
if !e.options.nulls_first {
602-
r += " nulls last";
603-
}
604-
r
605-
})
606-
.join(", ")
607-
);
613+
pp_append_sort_by(out, s.expr());
608614
}
609615
if let Some(fetch) = s.fetch() {
610616
*out += &format!(", fetch: {}", fetch);
@@ -656,8 +662,9 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
656662
*out += "CoalescePartitions";
657663
} else if let Some(s) = a.downcast_ref::<SortPreservingMergeExec>() {
658664
*out += "MergeSort";
659-
// } else if let Some(_) = a.downcast_ref::<MergeReSortExec>() {
660-
// *out += "MergeResort";
665+
if o.show_sort_by {
666+
pp_append_sort_by(out, s.expr());
667+
}
661668
if let Some(fetch) = s.fetch() {
662669
*out += &format!(", fetch: {}", fetch);
663670
}

rust/cubestore/cubestore/src/queryplanner/topk/execute.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,7 +1294,7 @@ mod tests {
12941294
.unwrap();
12951295
assert_eq!(r, vec![vec![0, 0]]);
12961296

1297-
// Ascending, null first.
1297+
// Ascending, nulls first.
12981298
proto.change_order(vec![SortColumn {
12991299
agg_index: 0,
13001300
asc: true,
@@ -1315,7 +1315,7 @@ mod tests {
13151315
.unwrap();
13161316
assert_eq!(r, vec![vec![Some(2), None]]);
13171317

1318-
// Ascending, null last.
1318+
// Ascending, nulls last.
13191319
proto.change_order(vec![SortColumn {
13201320
agg_index: 0,
13211321
asc: true,

0 commit comments

Comments
 (0)