Skip to content

Commit 3c64eb2

Browse files
committed
chore(cubestore): Upgrade DF: pretty_printer adjustments: show_partitions, show_schema
1 parent 0d5af0e commit 3c64eb2

File tree

3 files changed

+46
-28
lines changed

3 files changed

+46
-28
lines changed

rust/cubestore/cubestore/src/queryplanner/optimizations/distributed_partial_aggregate.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use crate::queryplanner::query_executor::ClusterSendExec;
44
use crate::queryplanner::tail_limit::TailLimitExec;
55
use crate::queryplanner::topk::AggregateTopKExec;
66
use datafusion::error::DataFusionError;
7-
use datafusion::physical_optimizer::topk_aggregation::TopKAggregation;
87
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
98
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
109
use datafusion::physical_plan::limit::GlobalLimitExec;

rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
//! Presentation of query plans for use in tests.
22
33
use bigdecimal::ToPrimitive;
4+
use datafusion::arrow::datatypes::Schema;
45
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
6+
use datafusion::common::DFSchema;
57
use datafusion::datasource::physical_plan::ParquetExec;
68
use datafusion::datasource::{DefaultTableSource, TableProvider};
79
use datafusion::error::DataFusionError;
@@ -34,7 +36,7 @@ use crate::queryplanner::tail_limit::TailLimitExec;
3436
use crate::queryplanner::topk::SortColumn;
3537
use crate::queryplanner::topk::{AggregateTopKExec, ClusterAggregateTopKUpper, ClusterAggregateTopKLower};
3638
use crate::queryplanner::trace_data_loaded::TraceDataLoadedExec;
37-
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider};
39+
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider, QueryPlan};
3840
use crate::streaming::topic_table_provider::TopicTableProvider;
3941
use datafusion::physical_plan::empty::EmptyExec;
4042
use datafusion::physical_plan::expressions::Column;
@@ -51,29 +53,24 @@ pub struct PPOptions {
5153
pub show_filters: bool,
5254
pub show_sort_by: bool,
5355
pub show_aggregations: bool,
54-
// TODO: Maybe prettify output, name this show_schema.
55-
pub debug_schema: bool,
56+
pub show_schema: bool,
5657
// Applies only to physical plan.
5758
pub show_output_hints: bool,
5859
pub show_check_memory_nodes: bool,
60+
pub show_partitions: bool,
5961
}
6062

6163
impl PPOptions {
62-
pub fn not_everything() -> PPOptions {
64+
#[allow(unused)]
65+
pub fn everything() -> PPOptions {
6366
PPOptions {
6467
show_filters: true,
6568
show_sort_by: true,
6669
show_aggregations: true,
67-
debug_schema: false,
70+
show_schema: true,
6871
show_output_hints: true,
6972
show_check_memory_nodes: true,
70-
}
71-
}
72-
73-
pub fn truly_everything() -> PPOptions {
74-
PPOptions {
75-
debug_schema: true,
76-
..PPOptions::not_everything()
73+
show_partitions: true,
7774
}
7875
}
7976

@@ -93,7 +90,18 @@ pub fn pp_phys_plan_ext(p: &dyn ExecutionPlan, o: &PPOptions) -> String {
9390
}
9491

9592
pub fn pp_plan(p: &LogicalPlan) -> String {
96-
pp_plan_ext(p, &PPOptions::default())
93+
pp_plan_ext(p, &PPOptions::none())
94+
}
95+
96+
pub fn pp_query_plan_ext(qp: &QueryPlan, o: &PPOptions) -> String {
97+
pp_plan_ext(match qp {
98+
QueryPlan::Meta(p) => p,
99+
QueryPlan::Select(pre_serialized_plan, _) => pre_serialized_plan.logical_plan()
100+
}, o)
101+
}
102+
103+
pub fn pp_query_plan(p: &QueryPlan) -> String {
104+
pp_query_plan_ext(p, &PPOptions::none())
97105
}
98106

99107
pub fn pp_plan_ext(p: &LogicalPlan, opts: &PPOptions) -> String {
@@ -178,7 +186,7 @@ pub fn pp_plan_ext(p: &LogicalPlan, opts: &PPOptions) -> String {
178186
}
179187
}
180188
LogicalPlan::Union(Union { schema, .. }) => {
181-
self.output += &format!("Union, schema: {}", schema)
189+
self.output += &format!("Union, schema: {}", pp_df_schema(schema.as_ref()))
182190
}
183191
LogicalPlan::Join(Join { on, .. }) => {
184192
self.output += &format!(
@@ -372,8 +380,8 @@ pub fn pp_plan_ext(p: &LogicalPlan, opts: &PPOptions) -> String {
372380
}
373381
}
374382

375-
if self.opts.debug_schema {
376-
self.output += &format!(", debug_schema: {:?}", plan.schema());
383+
if self.opts.show_schema {
384+
self.output += &format!(", schema: {}", pp_df_schema(plan.schema().as_ref()));
377385
}
378386

379387
if !saw_expected_topk_lower {
@@ -475,6 +483,8 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
475483
}
476484
out.extend(repeat_n(' ', indent));
477485

486+
let mut skip_show_partitions = false;
487+
478488
let a = p.as_any();
479489
if let Some(t) = a.downcast_ref::<CubeTableExec>() {
480490
*out += &format!("Scan, index: {}", pp_index(&t.index_snapshot));
@@ -588,6 +598,7 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
588598
})
589599
.join(", ")
590600
);
601+
skip_show_partitions = true;
591602
} else if let Some(topk) = a.downcast_ref::<AggregateTopKExec>() {
592603
*out += &format!("AggregateTopK, limit: {:?}", topk.limit);
593604
if o.show_aggregations {
@@ -661,14 +672,6 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
661672
*out += &to_string.split(" ").next().unwrap_or(&to_string);
662673
}
663674

664-
// TODO upgrade DF - remove
665-
// *out += &format!(", schema: {}", p.schema());
666-
// *out += &format!(
667-
// ", partitions: {}, output_ordering: {:?}",
668-
// p.properties().partitioning.partition_count(),
669-
// p.output_ordering()
670-
// );
671-
672675
if o.show_output_hints {
673676
let properties: &PlanProperties = p.properties();
674677

@@ -728,8 +731,12 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
728731
}
729732
}
730733

731-
if o.debug_schema {
732-
*out += &format!(", debug_schema: {:?}", p.schema());
734+
if o.show_schema {
735+
*out += &format!(", schema: {}", pp_schema(p.schema().as_ref()));
736+
}
737+
738+
if o.show_partitions && !skip_show_partitions {
739+
*out += &format!(", partitions: {}", p.properties().output_partitioning().partition_count());
733740
}
734741
}
735742
}
@@ -752,3 +759,15 @@ fn pp_row_range(r: &RowRange) -> String {
752759
fn pp_exprs(v: &Vec<Expr>) -> String {
753760
"[".to_owned() + &v.iter().map(|e: &Expr| format!("{}", e)).join(", ") + "]"
754761
}
762+
763+
fn pp_df_schema(schema: &DFSchema) -> String {
764+
// Like pp_schema but with qualifiers.
765+
format!("{}", schema)
766+
}
767+
768+
fn pp_schema(schema: &Schema) -> String {
769+
// Mimicking DFSchema's Display
770+
format!("fields:[{}], metadata:{:?}",
771+
schema.fields.iter().map(|f| f.name()).join(", "),
772+
schema.metadata)
773+
}

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1692,7 +1692,7 @@ impl ExecutionPlan for ClusterSendExec {
16921692
}
16931693

16941694
fn required_input_distribution(&self) -> Vec<Distribution> {
1695-
// TODO: If this is in place, and it is obeyed (with EnforceDistribution?), then we don't need to use a CoalescePartitions node in worker exec.
1695+
// TODO: Ensure this is obeyed... or allow worker partitions to be sent separately.
16961696
vec![Distribution::SinglePartition; self.children().len()]
16971697
}
16981698
}

0 commit comments

Comments
 (0)