diff --git a/src/execution_plans/stage.rs b/src/execution_plans/stage.rs index fc1e45a..a032bad 100644 --- a/src/execution_plans/stage.rs +++ b/src/execution_plans/stage.rs @@ -1,6 +1,9 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::{ArrowFlightReadExec, ChannelResolver, PartitionIsolatorExec}; -use datafusion::common::internal_err; +use datafusion::common::{ + internal_err, + tree_node::{TreeNode, TreeNodeRecursion}, +}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::TaskContext; use datafusion::physical_plan::{ @@ -9,6 +12,7 @@ use datafusion::physical_plan::{ use datafusion::prelude::SessionContext; use itertools::Itertools; use rand::Rng; +use std::collections::VecDeque; use std::sync::Arc; use url::Url; use uuid::Uuid; @@ -18,8 +22,8 @@ use uuid::Uuid; /// It implements [`ExecutionPlan`] and can be executed to produce a /// stream of record batches. /// -/// An ExecutionTask is a finer grained unit of work compared to an ExecutionStage. -/// One ExecutionStage will create one or more ExecutionTasks +/// An ExecutionTask is a finer grained unit of work compared to an StageExec. +/// One StageExec will create one or more ExecutionTasks /// /// When an [`StageExec`] is execute()'d if will execute its plan and return a stream /// of record batches. @@ -101,7 +105,7 @@ pub struct ExecutionTask { } impl StageExec { - /// Creates a new `ExecutionStage` with the given plan and inputs. One task will be created + /// Creates a new `StageExec` with the given plan and inputs. One task will be created /// responsible for partitions in the plan. pub fn new( query_id: Uuid, @@ -151,7 +155,7 @@ impl StageExec { format!("Stage {:<3}", self.num) } - /// Returns an iterator over the child stages of this stage cast as &ExecutionStage + /// Returns an iterator over the child stages of this stage cast as &StageExec /// which can be useful pub fn child_stages_iter(&self) -> impl Iterator { self.inputs @@ -402,127 +406,383 @@ fn format_tasks_for_partition_isolator(tasks: &[ExecutionTask]) -> String { result } -pub fn display_stage_graphviz(stage: &StageExec) -> Result { - let mut f = String::new(); +// num_colors must agree with the colorscheme selected from +// https://graphviz.org/doc/info/colors.html +const NUM_COLORS: usize = 6; +const COLOR_SCHEME: &str = "spectral6"; - let num_colors = 5; // this should aggree with the colorscheme chosen from - // https://graphviz.org/doc/info/colors.html - let colorscheme = "spectral5"; - - writeln!(f, "digraph G {{")?; - writeln!(f, " node[shape=rect];")?; - writeln!(f, " rankdir=BT;")?; - writeln!(f, " ranksep=2;")?; - writeln!(f, " edge[colorscheme={},penwidth=2.0];", colorscheme)?; - - // we'll keep a stack of stage ref, parrent stage ref - let mut stack: Vec<(&StageExec, Option<&StageExec>)> = vec![(stage, None)]; - - while let Some((stage, parent)) = stack.pop() { - writeln!(f, " subgraph cluster_{} {{", stage.num)?; - writeln!(f, " node[shape=record];")?; - writeln!(f, " label=\"{}\";", stage.name())?; - writeln!(f, " labeljust=r;")?; - writeln!(f, " labelloc=b;")?; // this will put the label at the top as our - // rankdir=BT - - stage.tasks.iter().try_for_each(|task| { - let lab = task - .partition_group - .iter() - .map(|p| format!("{}", p, p)) - .collect::>() - .join("|"); - writeln!( - f, - " \"{}_{}\"[label = \"{}\"]", - stage.num, - format_partition_group(&task.partition_group), - lab, - )?; +pub fn display_stage_graphviz(plan: Arc) -> Result { + let mut f = String::new(); - if let Some(our_parent) = parent { - our_parent.tasks.iter().try_for_each(|ptask| { - task.partition_group.iter().try_for_each(|partition| { - ptask.partition_group.iter().try_for_each(|ppartition| { - writeln!( - f, - " \"{}_{}\":p{}:n -> \"{}_{}\":p{}:s[color={}]", - stage.num, - format_partition_group(&task.partition_group), - partition, - our_parent.num, - format_partition_group(&ptask.partition_group), - ppartition, - (partition) % num_colors + 1 - ) - }) - }) - })?; + writeln!( + f, + "digraph G {{ + rankdir=BT + edge[colorscheme={}, penwidth=2.0] + splines=false +", + COLOR_SCHEME + )?; + + // draw all tasks first + plan.apply(|node| { + let stage = node + .as_any() + .downcast_ref::() + .expect("Expected StageExec"); + for task in stage.tasks.iter() { + let partition_group = &task.partition_group; + let p = display_single_task(stage, partition_group)?; + writeln!(f, "{}", p)?; + } + Ok(TreeNodeRecursion::Continue) + })?; + + // now draw edges between the tasks + + plan.apply(|node| { + let stage = node + .as_any() + .downcast_ref::() + .expect("Expected StageExec"); + + for child_stage in stage.child_stages_iter() { + for task in stage.tasks.iter() { + for child_task in child_stage.tasks.iter() { + let edges = display_inter_task_edges(stage, task, child_stage, child_task)?; + writeln!( + f, + "// edges from child stage {} task {} to stage {} task {}\n {}", + child_stage.num, + format_pg(&child_task.partition_group), + stage.num, + format_pg(&task.partition_group), + edges + )?; + } } + } - Ok::<(), std::fmt::Error>(()) - })?; + Ok(TreeNodeRecursion::Continue) + })?; - // now we try to force the left right nature of tasks to be honored - writeln!(f, " {{")?; - writeln!(f, " rank = same;")?; - stage.tasks.iter().try_for_each(|task| { - writeln!( - f, - " \"{}_{}\"", - stage.num, - format_partition_group(&task.partition_group) - )?; + writeln!(f, "}}")?; - Ok::<(), std::fmt::Error>(()) - })?; - writeln!(f, " }}")?; - // combined with rank = same, the invisible edges will force the tasks to be - // laid out in a single row within the stage - for i in 0..stage.tasks.len() - 1 { - writeln!( - f, - " \"{}_{}\":w -> \"{}_{}\":e[style=invis]", - stage.num, - format_partition_group(&stage.tasks[i].partition_group), - stage.num, - format_partition_group(&stage.tasks[i + 1].partition_group), - )?; + Ok(f) +} + +pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Result { + let mut f = String::new(); + writeln!( + f, + " + subgraph \"cluster_stage_{}_task_{}_margin\" {{ + style=invis + margin=20.0 + subgraph \"cluster_stage_{}_task_{}\" {{ + color=blue + style=dotted + label = \"Stage {} Task Partitions {}\" + labeljust=r + labelloc=b + + node[shape=none] + +", + stage.num, + format_pg(partition_group), + stage.num, + format_pg(partition_group), + stage.num, + format_pg(partition_group) + )?; + + // draw all plans + // we need to label the nodes including depth to uniquely identify them within this task + // the tree node API provides depth first traversal, but we need breadth to align with + // how we will draw edges below, so we'll do that. + let mut queue = VecDeque::from([&stage.plan]); + let mut index = 0; + while let Some(plan) = queue.pop_front() { + index += 1; + let p = display_single_plan(plan.as_ref(), stage.num, partition_group, index)?; + writeln!(f, "{}", p)?; + for child in plan.children().iter() { + queue.push_back(child); } + } - // add a node for the plan, its way too big! Alternatives to add it? - /*writeln!( - f, - " \"{}_plan\"[label = \"{}\", shape=box];", - stage.num, - displayable(stage.plan.as_ref()).indent(false) - )?; - */ + // draw edges between the plan nodes + type PlanWithParent<'a> = ( + &'a Arc, + Option<&'a Arc>, + usize, + ); + let mut queue: VecDeque = VecDeque::from([(&stage.plan, None, 0usize)]); + let mut found_isolator = false; + index = 0; + while let Some((plan, maybe_parent, parent_idx)) = queue.pop_front() { + index += 1; + if plan + .as_any() + .downcast_ref::() + .is_some() + { + found_isolator = true; + } + if let Some(parent) = maybe_parent { + let output_partitions = plan.output_partitioning().partition_count(); + + let partitions = if plan + .as_any() + .downcast_ref::() + .is_some() + || plan + .as_any() + .downcast_ref::() + .is_some() + { + output_partitions + } else if let Some(child) = plan.children().first() { + child.output_partitioning().partition_count() + } else { + output_partitions + }; - writeln!(f, " }}")?; + for i in 0..partitions { + let mut style = ""; + if plan + .as_any() + .downcast_ref::() + .is_some() + { + if i >= partition_group.len() { + style = "[style=dotted, label=empty]"; + } + } else if found_isolator && !partition_group.contains(&i) { + style = "[style=invis]"; + } - for child in stage.child_stages_iter() { - stack.push((child, Some(stage))); + writeln!( + f, + " {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s {}[color={}]", + plan.name(), + stage.num, + node_format_pg(partition_group), + index, + i, + parent.name(), + stage.num, + node_format_pg(partition_group), + parent_idx, + i, + style, + i % NUM_COLORS + 1 + )?; + } + } + + for child in plan.children().iter() { + queue.push_back((child, Some(plan), index)); } } + writeln!(f, " }}")?; + writeln!(f, " }}")?; - writeln!(f, "}}")?; Ok(f) } -pub fn format_partition_group(partition_group: &[usize]) -> String { - if partition_group.len() > 2 { - format!( - "{}..{}", - partition_group[0], - partition_group[partition_group.len() - 1] - ) +/// We want to display a single plan as a three row table with the top and bottom being +/// graphvis ports. +/// +/// We accept an index to make the node name unique in the graphviz output within +/// a plan at the same depth +/// +/// An example of such a node would be: +/// +/// ```text +/// ArrowFlightReadExec [label=< +/// +/// +/// +/// +/// +/// +/// +/// +/// +/// +///
+/// +/// +/// +/// +/// +///
+///
+/// +/// +/// +/// +///
ArrowFlightReadExec
+///
+/// +/// +/// +/// +/// +///
+///
+/// >]; +/// ``` +pub fn display_single_plan( + plan: &dyn ExecutionPlan, + stage_num: usize, + partition_group: &[usize], + index: usize, +) -> Result { + let mut f = String::new(); + let output_partitions = plan.output_partitioning().partition_count(); + let input_partitions = if let Some(child) = plan.children().first() { + child.output_partitioning().partition_count() + } else if plan + .as_any() + .downcast_ref::() + .is_some() + { + output_partitions } else { - partition_group - .iter() - .map(|pg| format!("{pg}")) - .collect::>() - .join(",") + 1 + }; + + writeln!( + f, + " + {}_{}_{}_{} [label=< + + + + + + + + + + +
+ + ", + plan.name(), + stage_num, + node_format_pg(partition_group), + index + )?; + + for i in 0..output_partitions { + writeln!(f, " ", i)?; } + + writeln!( + f, + " +
+
+ + + + +
{}
+
+ + ", + plan.name() + )?; + + for i in 0..input_partitions { + writeln!(f, " ", i)?; + } + + writeln!( + f, + " +
+
+ >]; +" + )?; + Ok(f) +} + +fn display_inter_task_edges( + stage: &StageExec, + task: &ExecutionTask, + child_stage: &StageExec, + child_task: &ExecutionTask, +) -> Result { + let mut f = String::new(); + + let mut found_isolator = false; + let mut queue = VecDeque::from([&stage.plan]); + let mut index = 0; + while let Some(plan) = queue.pop_front() { + index += 1; + if plan + .as_any() + .downcast_ref::() + .is_some() + { + found_isolator = true; + } + if plan + .as_any() + .downcast_ref::() + .is_some() + { + // draw the edges to this node pulling data up from its child + for p in 0..plan.output_partitioning().partition_count() { + let mut style = ""; + if found_isolator && !task.partition_group.contains(&p) { + style = "[style=invis]"; + } + + writeln!( + f, + " {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s {} [color={}]", + child_stage.plan.name(), + child_stage.num, + node_format_pg(&child_task.partition_group), + 1, // the repartition exec is always the first node in the plan + p, + plan.name(), + stage.num, + node_format_pg(&task.partition_group), + index, + p, + style, + p % NUM_COLORS + 1 + )?; + } + } + for child in plan.children().iter() { + queue.push_back(child); + } + } + + Ok(f) +} + +fn format_pg(partition_group: &[usize]) -> String { + partition_group + .iter() + .map(|pg| format!("{pg}")) + .collect::>() + .join("_") +} + +fn node_format_pg(partition_group: &[usize]) -> String { + partition_group + .iter() + .map(|pg| format!("{pg}")) + .collect::>() + .join("_") } diff --git a/src/stage/display.rs b/src/stage/display.rs new file mode 100644 index 0000000..3ea363c --- /dev/null +++ b/src/stage/display.rs @@ -0,0 +1,428 @@ +/// Be able to display a nice tree for stages. +/// +/// The challenge to doing this at the moment is that `TreeRenderVistor` +/// in [`datafusion::physical_plan::display`] is not public, and that it also +/// is specific to a `ExecutionPlan` trait object, which we don't have. +/// +/// TODO: try to upstream a change to make rendering of Trees (logical, physical, stages) against +/// a generic trait rather than a specific trait object. This would allow us to +/// use the same rendering code for all trees, including stages. +/// +/// In the meantime, we can make a dummy ExecutionPlan that will let us render +/// the Stage tree. +use std::{collections::VecDeque, fmt::Write, sync::Arc}; + +use datafusion::{ + common::tree_node::{TreeNode, TreeNodeRecursion}, + error::Result, + physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties}, +}; + +use crate::{ + common::util::display_plan_with_partition_in_out, + plan::PartitionIsolatorExec, + task::{format_pg, ExecutionTask}, + ArrowFlightReadExec, +}; + +use super::ExecutionStage; + +// Unicode box-drawing characters for creating borders and connections. +const LTCORNER: &str = "┌"; // Left top corner +const LDCORNER: &str = "└"; // Left bottom corner +const VERTICAL: &str = "│"; // Vertical line +const HORIZONTAL: &str = "─"; // Horizontal line + +// num_colors must agree with the colorscheme selected from +// https://graphviz.org/doc/info/colors.html +const NUM_COLORS: usize = 6; +const COLOR_SCHEME: &str = "spectral6"; + +impl DisplayAs for ExecutionStage { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + #[allow(clippy::format_in_format_args)] + match t { + DisplayFormatType::Default => { + write!(f, "{}", self.name) + } + DisplayFormatType::Verbose => { + writeln!( + f, + "{}{}{}{}", + LTCORNER, + HORIZONTAL.repeat(5), + format!(" {} ", self.name), + format_tasks(&self.tasks), + )?; + let plan_str = display_plan_with_partition_in_out(self.plan.as_ref()) + .map_err(|_| std::fmt::Error {})?; + let plan_str = plan_str + .split('\n') + .filter(|v| !v.is_empty()) + .collect::>() + .join(&format!("\n{}{}", " ".repeat(self.depth), VERTICAL)); + writeln!(f, "{}{}{}", " ".repeat(self.depth), VERTICAL, plan_str)?; + write!( + f, + "{}{}{}", + " ".repeat(self.depth), + LDCORNER, + HORIZONTAL.repeat(50) + )?; + + Ok(()) + } + DisplayFormatType::TreeRender => write!( + f, + "{}", + self.tasks + .iter() + .map(|task| format!("{task}")) + .collect::>() + .join("\n") + ), + } + } +} + +pub fn display_stage_graphviz(plan: Arc) -> Result { + let mut f = String::new(); + + writeln!( + f, + "digraph G {{ + rankdir=BT + edge[colorscheme={}, penwidth=2.0] + splines=false +", + COLOR_SCHEME + )?; + + // draw all tasks first + plan.apply(|node| { + let stage = node + .as_any() + .downcast_ref::() + .expect("Expected ExecutionStage"); + for task in stage.tasks.iter() { + let partition_group = &task.partition_group; + let p = display_single_task(stage, partition_group)?; + writeln!(f, "{}", p)?; + } + Ok(TreeNodeRecursion::Continue) + })?; + + // now draw edges between the tasks + + plan.apply(|node| { + let stage = node + .as_any() + .downcast_ref::() + .expect("Expected ExecutionStage"); + + for child_stage in stage.child_stages_iter() { + for task in stage.tasks.iter() { + for child_task in child_stage.tasks.iter() { + let edges = display_inter_task_edges(stage, task, child_stage, child_task)?; + writeln!(f, "{}", edges)?; + } + } + } + + Ok(TreeNodeRecursion::Continue) + })?; + + writeln!(f, "}}")?; + + Ok(f) +} + +pub fn display_single_task(stage: &ExecutionStage, partition_group: &[u64]) -> Result { + let mut f = String::new(); + writeln!( + f, + " + subgraph \"cluster_stage_{}_task_{}_margin\" {{ + style=invis + margin=20.0 + subgraph \"cluster_stage_{}_task_{}\" {{ + color=blue + style=dotted + label = \"Stage {} Task Partitions {}\" + labeljust=r + labelloc=b + + node[shape=none] + +", + stage.num, + format_pg(partition_group), + stage.num, + format_pg(partition_group), + stage.num, + format_pg(partition_group) + )?; + + // draw all plans + // we need to label the nodes including depth to uniquely identify them within this task + // the tree node API provides depth first traversal, but we need breadth to align with + // how we will draw edges below, so we'll do that. + let mut queue = VecDeque::from([(&stage.plan, 0, 0)]); + while let Some((plan, depth, index)) = queue.pop_front() { + let p = display_single_plan(plan.as_ref(), stage.num, partition_group, depth, index)?; + writeln!(f, "{}", p)?; + for (i, child) in plan.children().iter().enumerate() { + queue.push_back((child, depth + 1, i)); + } + } + + // draw edges between the plan nodes + queue = VecDeque::from([(&stage.plan, 0, 0)]); + let mut found_isolator = false; + while let Some((plan, depth, index)) = queue.pop_front() { + if plan + .as_any() + .downcast_ref::() + .is_some() + { + found_isolator = true; + } + for (child_index, child) in plan.children().iter().enumerate() { + let partitions = child.output_partitioning().partition_count(); + for i in 0..partitions { + let mut style = ""; + if child + .as_any() + .downcast_ref::() + .is_some() + && i >= partition_group.len() + { + style = "[style=dotted, label=empty]"; + } else if found_isolator && !partition_group.contains(&(i as u64)) { + style = "[style=invis]"; + } + + writeln!( + f, + " {}_{}_{}_{}_{}:t{}:n -> {}_{}_{}_{}_{}:b{}:s {}[color={}]", + child.name(), + stage.num, + node_format_pg(partition_group), + depth + 1, + child_index, + i, + plan.name(), + stage.num, + node_format_pg(partition_group), + depth, + index, + i, + style, + i % NUM_COLORS + 1 + )?; + } + queue.push_back((child, depth + 1, child_index)); + } + } + + writeln!(f, " }}")?; + writeln!(f, " }}")?; + + Ok(f) +} + +/// We want to display a single plan as a three row table with the top and bottom being +/// graphvis ports. +/// +/// We accept an index to make the node name unique in the graphviz output within +/// a plan at the same depth +/// +/// An example of such a node would be: +/// +/// ```text +/// ArrowFlightReadExec [label=< +/// +/// +/// +/// +/// +/// +/// +/// +/// +/// +///
+/// +/// +/// +/// +/// +///
+///
+/// +/// +/// +/// +///
ArrowFlightReadExec
+///
+/// +/// +/// +/// +/// +///
+///
+/// >]; +/// ``` +pub fn display_single_plan( + plan: &dyn ExecutionPlan, + stage_num: usize, + partition_group: &[u64], + depth: usize, + index: usize, +) -> Result { + let mut f = String::new(); + let output_partitions = plan.output_partitioning().partition_count(); + let input_partitions = if let Some(child) = plan.children().first() { + child.output_partitioning().partition_count() + } else if plan + .as_any() + .downcast_ref::() + .is_some() + { + output_partitions + } else { + 1 + }; + + writeln!( + f, + " + {}_{}_{}_{}_{} [label=< + + + + + + + + + + +
+ + ", + plan.name(), + stage_num, + node_format_pg(partition_group), + depth, + index + )?; + + for i in 0..output_partitions { + writeln!(f, " ", i)?; + } + + writeln!( + f, + " +
+
+ + + + +
{}
+
+ + ", + plan.name() + )?; + + for i in 0..input_partitions { + writeln!(f, " ", i)?; + } + + writeln!( + f, + " +
+
+ >]; +" + )?; + Ok(f) +} + +fn display_inter_task_edges( + stage: &ExecutionStage, + task: &ExecutionTask, + child_stage: &ExecutionStage, + child_task: &ExecutionTask, +) -> Result { + let mut f = String::new(); + + let mut found_isolator = false; + let mut queue = VecDeque::from([(&stage.plan, 0, 0)]); + while let Some((plan, depth, index)) = queue.pop_front() { + if plan + .as_any() + .downcast_ref::() + .is_some() + { + found_isolator = true; + } + if plan + .as_any() + .downcast_ref::() + .is_some() + { + // draw the edges to this node pulling data up from its child + for p in 0..plan.output_partitioning().partition_count() { + let mut style = ""; + if found_isolator && !task.partition_group.contains(&(p as u64)) { + style = "[style=invis]"; + } + + writeln!( + f, + " {}_{}_{}_{}_0:t{}:n -> {}_{}_{}_{}_{}:b{}:s {} [color={}]", + child_stage.plan.name(), + child_stage.num, + node_format_pg(&child_task.partition_group), + 0, + p, + plan.name(), + stage.num, + node_format_pg(&task.partition_group), + depth, + index, + p, + style, + p % NUM_COLORS + 1 + )?; + } + } + for (child_index, child) in plan.children().iter().enumerate() { + queue.push_back((child, depth + 1, child_index)); + } + } + + Ok(f) +} + +fn format_tasks(tasks: &[ExecutionTask]) -> String { + tasks + .iter() + .map(|task| format!("{task}")) + .collect::>() + .join(",") +} + +fn node_format_pg(partition_group: &[u64]) -> String { + partition_group + .iter() + .map(|pg| format!("{pg}")) + .collect::>() + .join("_") +} diff --git a/tests/.distributed_aggregation.rs.pending-snap b/tests/.distributed_aggregation.rs.pending-snap new file mode 100644 index 0000000..7780d99 --- /dev/null +++ b/tests/.distributed_aggregation.rs.pending-snap @@ -0,0 +1 @@ +{"run_id":"1756400899-521717000","line":32,"new":{"module_name":"distributed_aggregation__tests","snapshot_name":"distributed_aggregation","metadata":{"source":"tests/distributed_aggregation.rs","assertion_line":32,"expression":"physical_str"},"snapshot":"ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]\n SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]\n SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]\n ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]\n AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([RainToday@0], 10), input_partitions=10\n RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1\n AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]\n DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet"},"old":{"module_name":"distributed_aggregation__tests","metadata":{},"snapshot":"ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]\n SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]\n SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]\n ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]\n AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=3\n RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1\n AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]\n DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet"}}