Skip to content

Commit c9b8906

Browse files
committed
chore(cubestore): Upgrade DF 46: Pretty-printing improvements
- TraceDataLoadedExec nodes are now pretty-printed, consistently with the original. - CoalesceBatches now printed without the "Exec".
1 parent a2c1f1d commit c9b8906

File tree

4 files changed

+52
-23
lines changed

4 files changed

+52
-23
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3095,7 +3095,7 @@ async fn planning_inplace_aggregate(service: Box<dyn SqlClient>) {
30953095
"PartiallySortedFinalAggregate, partitions: 1\
30963096
\n Worker, partitions: 1\
30973097
\n PartiallySortedPartialAggregate, partitions: 1\
3098-
\n CoalesceBatchesExec, partitions: 1\
3098+
\n CoalesceBatches, partitions: 1\
30993099
\n Filter, partitions: 1\
31003100
\n Scan, index: default:2:[2]:sort_on[url, segment, day], fields: *, partitions: 1\
31013101
\n Sort, partitions: 1\
@@ -3113,7 +3113,7 @@ async fn planning_inplace_aggregate(service: Box<dyn SqlClient>) {
31133113
"PartiallySortedFinalAggregate, partitions: 1\
31143114
\n Worker, partitions: 1\
31153115
\n PartiallySortedPartialAggregate, partitions: 1\
3116-
\n CoalesceBatchesExec, partitions: 1\
3116+
\n CoalesceBatches, partitions: 1\
31173117
\n Filter, partitions: 1\
31183118
\n Scan, index: default:2:[2]:sort_on[url, segment, day], fields: *, partitions: 1\
31193119
\n Sort, partitions: 1\
@@ -3198,7 +3198,7 @@ async fn planning_hints(service: Box<dyn SqlClient>) {
31983198
\n Worker, single_vals: [1]\
31993199
\n CoalescePartitions, single_vals: [1]\
32003200
\n Projection, [id3, id2], single_vals: [1]\
3201-
\n CoalesceBatchesExec, single_vals: [0]\
3201+
\n CoalesceBatches, single_vals: [0]\
32023202
\n Filter, single_vals: [0]\
32033203
\n Scan, index: default:1:[1], fields: [id2, id3]\
32043204
\n Empty"
@@ -3212,7 +3212,7 @@ async fn planning_hints(service: Box<dyn SqlClient>) {
32123212
assert_eq!(
32133213
pp_phys_plan_ext(p.worker.as_ref(), &show_hints),
32143214
"Worker, sort_order: [0]\
3215-
\n CoalesceBatchesExec, sort_order: [0]\
3215+
\n CoalesceBatches, sort_order: [0]\
32163216
\n Filter, sort_order: [0]\
32173217
\n Scan, index: default:1:[1]:sort_on[id1, id2], fields: *, sort_order: [0, 1, 2]\
32183218
\n Sort, sort_order: [0, 1, 2]\
@@ -3225,7 +3225,7 @@ async fn planning_hints(service: Box<dyn SqlClient>) {
32253225
assert_eq!(
32263226
pp_phys_plan_ext(p.worker.as_ref(), &show_hints),
32273227
"Worker, sort_order: [0, 1]\
3228-
\n CoalesceBatchesExec, sort_order: [0, 1]\
3228+
\n CoalesceBatches, sort_order: [0, 1]\
32293229
\n Filter, sort_order: [0, 1]\
32303230
\n CoalescePartitions, sort_order: [0, 1, 2]\
32313231
\n Scan, index: default:1:[1], fields: *, sort_order: [0, 1, 2]\
@@ -3284,13 +3284,13 @@ async fn planning_inplace_aggregate2(service: Box<dyn SqlClient>) {
32843284
\n CoalescePartitions\
32853285
\n Union\
32863286
\n CoalescePartitions\
3287-
\n CoalesceBatchesExec\
3287+
\n CoalesceBatches\
32883288
\n Filter\
32893289
\n Scan, index: default:1:[1], fields: *, sort_order: [0, 1, 2, 3, 4]\
32903290
\n Sort, by: [allowed@0, site_id@1, url@2, day@3, hits@4], sort_order: [0, 1, 2, 3, 4]\
32913291
\n Empty\
32923292
\n CoalescePartitions\
3293-
\n CoalesceBatchesExec\
3293+
\n CoalesceBatches\
32943294
\n Filter\
32953295
\n Scan, index: default:2:[2], fields: *, sort_order: [0, 1, 2, 3, 4]\
32963296
\n Sort, by: [allowed@0, site_id@1, url@2, day@3, hits@4], sort_order: [0, 1, 2, 3, 4]\
@@ -3547,7 +3547,7 @@ async fn planning_simple(service: Box<dyn SqlClient>) {
35473547
assert_eq!(
35483548
pp_phys_plan(p.worker.as_ref()),
35493549
"Worker\
3550-
\n CoalesceBatchesExec\
3550+
\n CoalesceBatches\
35513551
\n Filter\
35523552
\n CoalescePartitions\
35533553
\n Scan, index: default:1:[1], fields: [id, amount]\
@@ -3573,7 +3573,7 @@ async fn planning_simple(service: Box<dyn SqlClient>) {
35733573
pp_phys_plan(p.worker.as_ref()),
35743574
"Sort\
35753575
\n Worker\
3576-
\n CoalesceBatchesExec\
3576+
\n CoalesceBatches\
35773577
\n Filter\
35783578
\n CoalescePartitions\
35793579
\n Scan, index: default:1:[1], fields: [id, amount]\
@@ -3599,7 +3599,7 @@ async fn planning_simple(service: Box<dyn SqlClient>) {
35993599
pp_phys_plan(p.worker.as_ref()),
36003600
"GlobalLimit, n: 10\
36013601
\n Worker\
3602-
\n CoalesceBatchesExec\
3602+
\n CoalesceBatches\
36033603
\n Filter\
36043604
\n CoalescePartitions\
36053605
\n Scan, index: default:1:[1], fields: [id, amount]\
@@ -3692,7 +3692,7 @@ async fn planning_filter_index_selection(service: Box<dyn SqlClient>) {
36923692
"SortedFinalAggregate\
36933693
\n Worker\
36943694
\n SortedPartialAggregate\
3695-
\n CoalesceBatchesExec\
3695+
\n CoalesceBatches\
36963696
\n Filter\
36973697
\n Scan, index: cb:2:[2]:sort_on[c, b], fields: [b, c, amount]\
36983698
\n Sort\
@@ -3716,7 +3716,7 @@ async fn planning_filter_index_selection(service: Box<dyn SqlClient>) {
37163716
\n Worker\
37173717
\n CoalescePartitions\
37183718
\n LinearPartialAggregate\
3719-
\n CoalesceBatchesExec\
3719+
\n CoalesceBatches\
37203720
\n Filter\
37213721
\n Scan, index: cb:2:[2], fields: [b, c, amount]\
37223722
\n Sort\
@@ -3741,7 +3741,7 @@ async fn planning_filter_index_selection(service: Box<dyn SqlClient>) {
37413741
"SortedFinalAggregate\
37423742
\n Worker\
37433743
\n SortedPartialAggregate\
3744-
\n CoalesceBatchesExec\
3744+
\n CoalesceBatches\
37453745
\n Filter\
37463746
\n Scan, index: cb:2:[2]:sort_on[c, b], fields: [a, b, c, amount]\
37473747
\n Sort\
@@ -3911,15 +3911,15 @@ async fn planning_3_table_joins(service: Box<dyn SqlClient>) {
39113911
\n MergeJoin, on: [product_id@1 = product_id@0]\
39123912
\n Projection, [order_id, product_id, customer_name]\
39133913
\n MergeJoin, on: [customer_id@1 = customer_id@0]\
3914-
\n CoalesceBatchesExec\
3914+
\n CoalesceBatches\
39153915
\n Filter, predicate: product_id@2 = 125\
39163916
\n Scan, index: by_product_customer:3:[3]:sort_on[product_id, customer_id], fields: [order_id, customer_id, product_id], predicate: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: \"product_id\" }), op: Eq, right: Literal(Int64(125)) })\
39173917
\n Sort\
39183918
\n Empty\
39193919
\n Scan, index: default:4:[4]:sort_on[customer_id], fields: *\
39203920
\n Sort\
39213921
\n Empty\
3922-
\n CoalesceBatchesExec\
3922+
\n CoalesceBatches\
39233923
\n Filter, predicate: product_id@0 = 125\
39243924
\n Scan, index: default:5:[5]:sort_on[product_id], fields: *, predicate: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: \"product_id\" }), op: Eq, right: Literal(Int64(125)) })\
39253925
\n Sort\
@@ -7530,7 +7530,7 @@ async fn planning_aggregate_index(service: Box<dyn SqlClient>) {
75307530
"SortedFinalAggregate\
75317531
\n Worker\
75327532
\n SortedPartialAggregate\
7533-
\n CoalesceBatchesExec\
7533+
\n CoalesceBatches\
75347534
\n Filter\
75357535
\n Scan, index: default:3:[3]:sort_on[a, b, c], fields: *\
75367536
\n Sort\
@@ -7576,7 +7576,7 @@ async fn planning_aggregate_index(service: Box<dyn SqlClient>) {
75767576
"SortedFinalAggregate\
75777577
\n Worker\
75787578
\n SortedPartialAggregate\
7579-
\n CoalesceBatchesExec\
7579+
\n CoalesceBatches\
75807580
\n Filter\
75817581
\n Scan, index: aggr_index:2:[2]:sort_on[a, b], fields: [a, b, a_sum]\
75827582
\n Sort\

rust/cubestore/cubestore/src/queryplanner/flatten_union.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use datafusion::optimizer::OptimizerConfig;
77
use std::fmt::Debug;
88
use std::sync::Arc;
99

10+
// TODO upgrade DF: Remove? We have EliminateNestedUnion.
1011
#[derive(Debug)]
1112
pub struct FlattenUnion;
1213

rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ use datafusion::logical_expr::{
1111
Aggregate, EmptyRelation, Explain, Extension, FetchType, Filter, Join, Limit, LogicalPlan, Projection, Repartition, SkipType, Sort, TableScan, Union, Window
1212
};
1313
use datafusion::physical_expr::{AcrossPartitions, ConstExpr};
14+
use datafusion::physical_optimizer::pruning;
1415
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
16+
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
1517
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
1618
use datafusion::physical_plan::filter::FilterExec;
1719
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
18-
use datafusion::physical_plan::{DefaultDisplay, DisplayAs, DisplayFormatType, ExecutionPlan, InputOrderMode, PlanProperties};
20+
use datafusion::physical_plan::{DefaultDisplay, ExecutionPlan, InputOrderMode, PlanProperties};
1921
use datafusion::prelude::Expr;
2022
use datafusion_datasource::file_scan_config::FileScanConfig;
2123
use datafusion_datasource::memory::MemoryExec;
@@ -37,7 +39,6 @@ use crate::queryplanner::serialized_plan::{IndexSnapshot, RowRange};
3739
use crate::queryplanner::tail_limit::TailLimitExec;
3840
use crate::queryplanner::topk::SortColumn;
3941
use crate::queryplanner::topk::{AggregateTopKExec, ClusterAggregateTopKUpper, ClusterAggregateTopKLower};
40-
use crate::queryplanner::trace_data_loaded::TraceDataLoadedExec;
4142
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider, QueryPlan};
4243
use crate::streaming::topic_table_provider::TopicTableProvider;
4344
use datafusion::physical_plan::empty::EmptyExec;
@@ -59,10 +60,12 @@ pub struct PPOptions {
5960
pub show_output_hints: bool,
6061
pub show_check_memory_nodes: bool,
6162
pub show_partitions: bool,
63+
pub show_metrics: bool,
6264
pub traverse_past_clustersend: bool,
6365
}
6466

6567
impl PPOptions {
68+
// TODO upgrade DF: Rename
6669
#[allow(unused)]
6770
pub fn show_all() -> PPOptions {
6871
PPOptions {
@@ -73,6 +76,7 @@ impl PPOptions {
7376
show_output_hints: true,
7477
show_check_memory_nodes: true,
7578
show_partitions: true,
79+
show_metrics: false, // yeah
7680
traverse_past_clustersend: false,
7781
}
7882
}
@@ -470,8 +474,7 @@ pub fn pp_sort_columns(first_agg: usize, cs: &[SortColumn]) -> String {
470474
}
471475

472476
fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, out: &mut String) {
473-
if (p.as_any().is::<CheckMemoryExec>() || p.as_any().is::<TraceDataLoadedExec>())
474-
&& !o.show_check_memory_nodes
477+
if p.as_any().is::<CheckMemoryExec>() && !o.show_check_memory_nodes
475478
{
476479
//We don't show CheckMemoryExec in plan by default
477480
if let Some(child) = p.children().first() {
@@ -630,6 +633,8 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
630633
*out += "PanicWorker";
631634
} else if let Some(_) = a.downcast_ref::<WorkerExec>() {
632635
*out += &format!("Worker");
636+
} else if let Some(_) = a.downcast_ref::<CoalesceBatchesExec>() {
637+
*out += "CoalesceBatches";
633638
} else if let Some(_) = a.downcast_ref::<CoalescePartitionsExec>() {
634639
*out += "CoalescePartitions";
635640
} else if let Some(s) = a.downcast_ref::<SortPreservingMergeExec>() {
@@ -676,6 +681,23 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
676681
"ParquetScan, files: {}",
677682
fse.file_groups.iter().flatten().map(|p| p.object_meta.location.to_string()).join(","),
678683
);
684+
if o.show_filters {
685+
if let Some(predicate) = p.predicate() {
686+
*out += &format!(", predicate: {}", predicate);
687+
}
688+
// pruning_predicate and page_pruning_predicate are derived from
689+
// p.predicate(), and they tend to be more verbose. Note: because we have
690+
// configured the default pushdown_filters = false (default false as of DF
691+
// <= 46.0.1), p.predicate() is not directly used.
692+
693+
// if let Some(pruning_predicate) = p.pruning_predicate() {
694+
// *out += &format!(", pruning_predicate: {}", pruning_predicate.predicate_expr());
695+
// }
696+
// if let Some(page_pruning_predicate) = p.page_pruning_predicate() {
697+
// // If this is uncommented, page_pruning_predicate.predicates() would need to be added to DF.
698+
// *out += &format!(", page_pruning_predicates: [{}]", page_pruning_predicate.predicates().iter().map(|pred| pred.predicate_expr()).join(", "));
699+
// }
700+
}
679701
} else {
680702
*out += &format!("{}", DefaultDisplay(dse));
681703
}
@@ -766,6 +788,12 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
766788
if o.show_partitions && !skip_show_partitions {
767789
*out += &format!(", partitions: {}", p.properties().output_partitioning().partition_count());
768790
}
791+
792+
if o.show_metrics {
793+
if let Some(m) = p.metrics() {
794+
*out += &format!(", metrics: {}", m);
795+
}
796+
}
769797
}
770798
}
771799

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3360,7 +3360,7 @@ mod tests {
33603360
\n Worker\
33613361
\n CoalescePartitions\
33623362
\n LinearPartialAggregate\
3363-
\n CoalesceBatchesExec\
3363+
\n CoalesceBatches\
33643364
\n Filter\
33653365
\n MergeSort\
33663366
\n Scan, index: default:1:[1]:sort_on[num], fields: *\
@@ -4430,7 +4430,7 @@ mod tests {
44304430
.values()[2] {
44314431
TableValue::String(pp_plan) => {
44324432
let regex = Regex::new(
4433-
r"LinearPartialAggregate\s+CoalesceBatchesExec\s+Filter\s+Scan, index: default:1:\[1\], fields: \[platform, age, amount\]\s+ParquetScan, files: \S*\.chunk\.parquet"
4433+
r"LinearPartialAggregate\s+CoalesceBatches\s+Filter\s+Scan, index: default:1:\[1\], fields: \[platform, age, amount\]\s+ParquetScan, files: \S*\.chunk\.parquet"
44344434
).unwrap();
44354435
let matches = regex.captures_iter(&pp_plan).count();
44364436
assert_eq!(matches, 1, "pp_plan = {}", pp_plan);

0 commit comments

Comments
 (0)