Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
mod composed_extension_codec;
#[allow(unused)]
pub mod ttl_map;
pub mod util;

pub(crate) use composed_extension_codec::ComposedPhysicalExtensionCodec;
36 changes: 0 additions & 36 deletions src/common/util.rs

This file was deleted.

166 changes: 83 additions & 83 deletions src/physical_optimizer.rs

Large diffs are not rendered by default.

7 changes: 1 addition & 6 deletions src/plan/arrow_flight_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,7 @@ impl ArrowFlightReadExec {

impl DisplayAs for ArrowFlightReadExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match self {
ArrowFlightReadExec::Pending(_) => write!(f, "ArrowFlightReadExec"),
ArrowFlightReadExec::Ready(v) => {
write!(f, "ArrowFlightReadExec: Stage {:<3}", v.stage_num)
}
}
write!(f, "ArrowFlightReadExec")
}
}

Expand Down
6 changes: 1 addition & 5 deletions src/plan/isolator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ impl PartitionIsolatorExec {

impl DisplayAs for PartitionIsolatorExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"PartitionIsolatorExec [providing upto {} partitions]",
self.partition_count
)
write!(f, "PartitionIsolatorExec",)
}
}

Expand Down
96 changes: 75 additions & 21 deletions src/stage/display.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
use super::ExecutionStage;
use crate::plan::PartitionIsolatorExec;
use crate::{
task::{format_pg, ExecutionTask},
ArrowFlightReadExec,
};
use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
use datafusion::{
error::Result,
physical_plan::{DisplayAs, DisplayFormatType},
};
use itertools::Itertools;
/// Be able to display a nice tree for stages.
///
/// The challenge to doing this at the moment is that `TreeRenderVistor`
Expand All @@ -12,24 +24,47 @@
/// the Stage tree.
use std::fmt::Write;

use datafusion::{
error::Result,
physical_plan::{DisplayAs, DisplayFormatType},
};

use crate::{
common::util::display_plan_with_partition_in_out,
task::{format_pg, ExecutionTask},
};

use super::ExecutionStage;

// Unicode box-drawing characters for creating borders and connections.
const LTCORNER: &str = "┌"; // Left top corner
const LDCORNER: &str = "└"; // Left bottom corner
const VERTICAL: &str = "│"; // Vertical line
const HORIZONTAL: &str = "─"; // Horizontal line

impl ExecutionStage {
fn format(&self, plan: &dyn ExecutionPlan, indent: usize, f: &mut String) -> std::fmt::Result {
let mut node_str = displayable(plan).one_line().to_string();
node_str.pop();
write!(f, "{} {node_str}", " ".repeat(indent))?;

if let Some(ArrowFlightReadExec::Ready(ready)) =
plan.as_any().downcast_ref::<ArrowFlightReadExec>()
{
let Some(input_stage) = &self.child_stages_iter().find(|v| v.num == ready.stage_num)
else {
writeln!(f, "Wrong partition number {}", ready.stage_num)?;
return Ok(());
};
let tasks = input_stage.tasks.len();
let partitions = plan.output_partitioning().partition_count();
let stage = ready.stage_num;
write!(
f,
" input_stage={stage}, input_partitions={partitions}, input_tasks={tasks}",
)?;
}

if plan.as_any().is::<PartitionIsolatorExec>() {
write!(f, " {}", format_tasks_for_partition_isolator(&self.tasks))?;
}
writeln!(f)?;

for child in plan.children() {
self.format(child.as_ref(), indent + 2, f)?;
}
Ok(())
}
}

impl DisplayAs for ExecutionStage {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
#[allow(clippy::format_in_format_args)]
Expand All @@ -44,10 +79,11 @@ impl DisplayAs for ExecutionStage {
LTCORNER,
HORIZONTAL.repeat(5),
format!(" {} ", self.name),
format_tasks(&self.tasks),
format_tasks_for_stage(&self.tasks),
)?;
let plan_str = display_plan_with_partition_in_out(self.plan.as_ref())
.map_err(|_| std::fmt::Error {})?;

let mut plan_str = String::new();
self.format(self.plan.as_ref(), 0, &mut plan_str)?;
let plan_str = plan_str
.split('\n')
.filter(|v| !v.is_empty())
Expand Down Expand Up @@ -186,10 +222,28 @@ pub fn display_stage_graphviz(stage: &ExecutionStage) -> Result<String> {
Ok(f)
}

fn format_tasks(tasks: &[ExecutionTask]) -> String {
tasks
.iter()
.map(|task| format!("{task}"))
.collect::<Vec<String>>()
.join(",")
fn format_tasks_for_stage(tasks: &[ExecutionTask]) -> String {
let mut result = "Tasks: ".to_string();
for (i, t) in tasks.iter().enumerate() {
result += &format!("t{i}:[");
result += &t.partition_group.iter().map(|v| format!("p{v}")).join(",");
result += "] "
}
result
}

fn format_tasks_for_partition_isolator(tasks: &[ExecutionTask]) -> String {
let mut result = "Tasks: ".to_string();
let mut partitions = vec![];
for t in tasks.iter() {
partitions.extend(vec!["__".to_string(); t.partition_group.len()])
}
for (i, t) in tasks.iter().enumerate() {
let mut partitions = partitions.clone();
for (i, p) in t.partition_group.iter().enumerate() {
partitions[*p as usize] = format!("p{i}")
}
result += &format!("t{i}:[{}] ", partitions.join(","));
}
result
}
24 changes: 12 additions & 12 deletions tests/custom_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,20 @@ mod tests {
DistributedPhysicalOptimizerRule::default().distribute_plan(distributed_plan)?;

assert_snapshot!(displayable(&distributed_plan).indent(true).to_string(), @r"
┌───── Stage 3 Task: partitions: 0,unassigned]
partitions [out:1 <-- in:1 ] SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
partitions [out:1 <-- in:10 ] RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=10
partitions [out:10 ] ArrowFlightReadExec: Stage 2
┌───── Stage 3 Tasks: t0:[p0]
│ SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
│ RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=10
ArrowFlightReadExec input_stage=2, input_partitions=10, input_tasks=1
└──────────────────────────────────────────────────
┌───── Stage 2 Task: partitions: 0..9,unassigned]
partitions [out:10 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
partitions [out:1 <-- in:1 ] SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
partitions [out:1 ] ArrowFlightReadExec: Stage 1
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
│ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
│ SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
ArrowFlightReadExec input_stage=1, input_partitions=1, input_tasks=1
└──────────────────────────────────────────────────
┌───── Stage 1 Task: partitions: 0,unassigned]
partitions [out:1 <-- in:1 ] RepartitionExec: partitioning=Hash([numbers@0], 1), input_partitions=1
partitions [out:1 <-- in:1 ] FilterExec: numbers@0 > 1
partitions [out:1 ] Int64ListExec: length=6
┌───── Stage 1 Tasks: t0:[p0]
│ RepartitionExec: partitioning=Hash([numbers@0], 1), input_partitions=1
│ FilterExec: numbers@0 > 1
│ Int64ListExec: length=6
└──────────────────────────────────────────────────
");

Expand Down
34 changes: 17 additions & 17 deletions tests/distributed_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,25 @@ mod tests {

assert_snapshot!(physical_distributed_str,
@r"
┌───── Stage 3 Task: partitions: 0,unassigned]
partitions [out:1 <-- in:1 ] ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
partitions [out:1 <-- in:3 ] SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
partitions [out:3 <-- in:3 ] SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
partitions [out:3 <-- in:3 ] ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
partitions [out:3 <-- in:3 ] AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
partitions [out:3 <-- in:3 ] CoalesceBatchesExec: target_batch_size=8192
partitions [out:3 ] ArrowFlightReadExec: Stage 2
┌───── Stage 3 Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ CoalesceBatchesExec: target_batch_size=8192
ArrowFlightReadExec input_stage=2, input_partitions=3, input_tasks=3
└──────────────────────────────────────────────────
┌───── Stage 2 Task: partitions: 0,unassigned],Task: partitions: 1,unassigned],Task: partitions: 2,unassigned]
partitions [out:3 <-- in:1 ] RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=1
partitions [out:1 <-- in:3 ] PartitionIsolatorExec [providing upto 1 partitions]
partitions [out:3 ] ArrowFlightReadExec: Stage 1
┌───── Stage 2 Tasks: t0:[p0] t1:[p1] t2:[p2]
│ RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=1
PartitionIsolatorExec Tasks: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
ArrowFlightReadExec input_stage=1, input_partitions=3, input_tasks=3
└──────────────────────────────────────────────────
┌───── Stage 1 Task: partitions: 0,unassigned],Task: partitions: 1,unassigned],Task: partitions: 2,unassigned]
partitions [out:3 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
partitions [out:1 <-- in:1 ] PartitionIsolatorExec [providing upto 1 partitions]
partitions [out:1 <-- in:1 ] AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet
┌───── Stage 1 Tasks: t0:[p0] t1:[p1] t2:[p2]
│ RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
PartitionIsolatorExec Tasks: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet
└──────────────────────────────────────────────────
",
);
Expand Down
23 changes: 12 additions & 11 deletions tests/highly_distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod tests {
use std::sync::Arc;

#[tokio::test]
#[ignore] // This test is flaky
async fn highly_distributed_query() -> Result<(), Box<dyn Error>> {
let (ctx, _guard) = start_localhost_context(9, DefaultSessionBuilder).await;
register_parquet_tables(&ctx).await?;
Expand Down Expand Up @@ -45,20 +46,20 @@ mod tests {

assert_snapshot!(physical_distributed_str,
@r"
┌───── Stage 4 Task: partitions: 0..4,unassigned]
partitions [out:5 ] ArrowFlightReadExec: Stage 3
┌───── Stage 4 Tasks: t0:[p0,p1,p2,p3,p4]
ArrowFlightReadExec input_stage=3, input_partitions=5, input_tasks=1
└──────────────────────────────────────────────────
┌───── Stage 3 Task: partitions: 0..4,unassigned]
partitions [out:5 <-- in:10 ] RepartitionExec: partitioning=RoundRobinBatch(5), input_partitions=10
partitions [out:10 ] ArrowFlightReadExec: Stage 2
┌───── Stage 3 Tasks: t0:[p0,p1,p2,p3,p4]
│ RepartitionExec: partitioning=RoundRobinBatch(5), input_partitions=10
ArrowFlightReadExec input_stage=2, input_partitions=10, input_tasks=1
└──────────────────────────────────────────────────
┌───── Stage 2 Task: partitions: 0..9,unassigned]
partitions [out:10 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
partitions [out:1 ] ArrowFlightReadExec: Stage 1
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
│ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ArrowFlightReadExec input_stage=1, input_partitions=1, input_tasks=1
└──────────────────────────────────────────────────
┌───── Stage 1 Task: partitions: 0,unassigned]
partitions [out:1 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1
partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/flights-1m.parquet]]}, projection=[FL_DATE, DEP_DELAY, ARR_DELAY, AIR_TIME, DISTANCE, DEP_TIME, ARR_TIME], file_type=parquet
┌───── Stage 1 Tasks: t0:[p0]
│ RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1
│ DataSourceExec: file_groups={1 group: [[/testdata/flights-1m.parquet]]}, projection=[FL_DATE, DEP_DELAY, ARR_DELAY, AIR_TIME, DISTANCE, DEP_TIME, ARR_TIME], file_type=parquet
└──────────────────────────────────────────────────
",
);
Expand Down
Loading