Skip to content

Commit 80caee4

Browse files
committed
Improve explain
1 parent b900fec commit 80caee4

File tree

10 files changed

+826
-829
lines changed

10 files changed

+826
-829
lines changed

src/common/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
mod composed_extension_codec;
22
#[allow(unused)]
33
pub mod ttl_map;
4-
pub mod util;
54

65
pub(crate) use composed_extension_codec::ComposedPhysicalExtensionCodec;

src/common/util.rs

Lines changed: 0 additions & 36 deletions
This file was deleted.

src/physical_optimizer.rs

Lines changed: 83 additions & 83 deletions
Large diffs are not rendered by default.

src/plan/arrow_flight_read.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,7 @@ impl ArrowFlightReadExec {
102102

103103
impl DisplayAs for ArrowFlightReadExec {
104104
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
105-
match self {
106-
ArrowFlightReadExec::Pending(_) => write!(f, "ArrowFlightReadExec"),
107-
ArrowFlightReadExec::Ready(v) => {
108-
write!(f, "ArrowFlightReadExec: Stage {:<3}", v.stage_num)
109-
}
110-
}
105+
write!(f, "ArrowFlightReadExec")
111106
}
112107
}
113108

src/plan/isolator.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,7 @@ impl PartitionIsolatorExec {
4646

4747
impl DisplayAs for PartitionIsolatorExec {
4848
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
49-
write!(
50-
f,
51-
"PartitionIsolatorExec [providing upto {} partitions]",
52-
self.partition_count
53-
)
49+
write!(f, "PartitionIsolatorExec",)
5450
}
5551
}
5652

src/stage/display.rs

Lines changed: 62 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,15 @@
1+
use super::ExecutionStage;
2+
use crate::plan::PartitionIsolatorExec;
3+
use crate::{
4+
task::{format_pg, ExecutionTask},
5+
ArrowFlightReadExec,
6+
};
7+
use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
8+
use datafusion::{
9+
error::Result,
10+
physical_plan::{DisplayAs, DisplayFormatType},
11+
};
12+
use itertools::Itertools;
113
/// Be able to display a nice tree for stages.
214
///
315
/// The challenge to doing this at the moment is that `TreeRenderVistor`
@@ -12,24 +24,47 @@
1224
/// the Stage tree.
1325
use std::fmt::Write;
1426

15-
use datafusion::{
16-
error::Result,
17-
physical_plan::{DisplayAs, DisplayFormatType},
18-
};
19-
20-
use crate::{
21-
common::util::display_plan_with_partition_in_out,
22-
task::{format_pg, ExecutionTask},
23-
};
24-
25-
use super::ExecutionStage;
26-
2727
// Unicode box-drawing characters for creating borders and connections.
2828
const LTCORNER: &str = "┌"; // Left top corner
2929
const LDCORNER: &str = "└"; // Left bottom corner
3030
const VERTICAL: &str = "│"; // Vertical line
3131
const HORIZONTAL: &str = "─"; // Horizontal line
3232

33+
impl ExecutionStage {
34+
fn format(&self, plan: &dyn ExecutionPlan, indent: usize, f: &mut String) -> std::fmt::Result {
35+
let mut node_str = displayable(plan).one_line().to_string();
36+
node_str.pop();
37+
write!(f, "{} {node_str}", " ".repeat(indent))?;
38+
39+
if let Some(ArrowFlightReadExec::Ready(ready)) =
40+
plan.as_any().downcast_ref::<ArrowFlightReadExec>()
41+
{
42+
let Some(input_stage) = &self.child_stages_iter().find(|v| v.num == ready.stage_num)
43+
else {
44+
writeln!(f, "Wrong partition number {}", ready.stage_num)?;
45+
return Ok(());
46+
};
47+
let tasks = input_stage.tasks.len();
48+
let partitions = plan.output_partitioning().partition_count();
49+
let stage = ready.stage_num;
50+
write!(
51+
f,
52+
" input_stage={stage}, input_partitions={partitions}, input_tasks={tasks}",
53+
)?;
54+
}
55+
56+
if plan.as_any().is::<PartitionIsolatorExec>() {
57+
write!(f, " {}", format_tasks(&self.tasks))?;
58+
}
59+
writeln!(f)?;
60+
61+
for child in plan.children() {
62+
self.format(child.as_ref(), indent + 2, f)?;
63+
}
64+
Ok(())
65+
}
66+
}
67+
3368
impl DisplayAs for ExecutionStage {
3469
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3570
#[allow(clippy::format_in_format_args)]
@@ -46,8 +81,9 @@ impl DisplayAs for ExecutionStage {
4681
format!(" {} ", self.name),
4782
format_tasks(&self.tasks),
4883
)?;
49-
let plan_str = display_plan_with_partition_in_out(self.plan.as_ref())
50-
.map_err(|_| std::fmt::Error {})?;
84+
85+
let mut plan_str = String::new();
86+
self.format(self.plan.as_ref(), 0, &mut plan_str)?;
5187
let plan_str = plan_str
5288
.split('\n')
5389
.filter(|v| !v.is_empty())
@@ -187,9 +223,16 @@ pub fn display_stage_graphviz(stage: &ExecutionStage) -> Result<String> {
187223
}
188224

189225
fn format_tasks(tasks: &[ExecutionTask]) -> String {
190-
tasks
191-
.iter()
192-
.map(|task| format!("{task}"))
193-
.collect::<Vec<String>>()
194-
.join(",")
226+
let mut result = "Tasks: ".to_string();
227+
for (i, task) in tasks.iter().enumerate() {
228+
result += &format!("t{i}:[");
229+
result += task
230+
.partition_group
231+
.iter()
232+
.map(|v| format!("p{}", v))
233+
.join(",")
234+
.as_str();
235+
result += "] "
236+
}
237+
result
195238
}

tests/custom_extension_codec.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,20 @@ mod tests {
6161
DistributedPhysicalOptimizerRule::default().distribute_plan(distributed_plan)?;
6262

6363
assert_snapshot!(displayable(&distributed_plan).indent(true).to_string(), @r"
64-
┌───── Stage 3 Task: partitions: 0,unassigned]
65-
partitions [out:1 <-- in:1 ] SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
66-
partitions [out:1 <-- in:10 ] RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=10
67-
partitions [out:10 ] ArrowFlightReadExec: Stage 2
64+
┌───── Stage 3 Tasks: t0:[p0]
65+
│ SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
66+
│ RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=10
67+
ArrowFlightReadExec input_stage=2, input_partitions=10, input_tasks=1
6868
└──────────────────────────────────────────────────
69-
┌───── Stage 2 Task: partitions: 0..9,unassigned]
70-
partitions [out:10 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
71-
partitions [out:1 <-- in:1 ] SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
72-
partitions [out:1 ] ArrowFlightReadExec: Stage 1
69+
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
70+
│ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
71+
│ SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
72+
ArrowFlightReadExec input_stage=1, input_partitions=1, input_tasks=1
7373
└──────────────────────────────────────────────────
74-
┌───── Stage 1 Task: partitions: 0,unassigned]
75-
partitions [out:1 <-- in:1 ] RepartitionExec: partitioning=Hash([numbers@0], 1), input_partitions=1
76-
partitions [out:1 <-- in:1 ] FilterExec: numbers@0 > 1
77-
partitions [out:1 ] Int64ListExec: length=6
74+
┌───── Stage 1 Tasks: t0:[p0]
75+
│ RepartitionExec: partitioning=Hash([numbers@0], 1), input_partitions=1
76+
│ FilterExec: numbers@0 > 1
77+
│ Int64ListExec: length=6
7878
└──────────────────────────────────────────────────
7979
");
8080

tests/distributed_aggregation.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,25 +50,25 @@ mod tests {
5050

5151
assert_snapshot!(physical_distributed_str,
5252
@r"
53-
┌───── Stage 3 Task: partitions: 0,unassigned]
54-
partitions [out:1 <-- in:1 ] ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
55-
partitions [out:1 <-- in:3 ] SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
56-
partitions [out:3 <-- in:3 ] SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
57-
partitions [out:3 <-- in:3 ] ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
58-
partitions [out:3 <-- in:3 ] AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
59-
partitions [out:3 <-- in:3 ] CoalesceBatchesExec: target_batch_size=8192
60-
partitions [out:3 ] ArrowFlightReadExec: Stage 2
53+
┌───── Stage 3 Tasks: t0:[p0]
54+
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
55+
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
56+
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
57+
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
58+
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
59+
│ CoalesceBatchesExec: target_batch_size=8192
60+
ArrowFlightReadExec input_stage=2, input_partitions=3, input_tasks=3
6161
└──────────────────────────────────────────────────
62-
┌───── Stage 2 Task: partitions: 0,unassigned],Task: partitions: 1,unassigned],Task: partitions: 2,unassigned]
63-
partitions [out:3 <-- in:1 ] RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=1
64-
partitions [out:1 <-- in:3 ] PartitionIsolatorExec [providing upto 1 partitions]
65-
partitions [out:3 ] ArrowFlightReadExec: Stage 1
62+
┌───── Stage 2 Tasks: t0:[p0] t1:[p1] t2:[p2]
63+
│ RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=1
64+
PartitionIsolatorExec Tasks: t0:[p0] t1:[p1] t2:[p2]
65+
ArrowFlightReadExec input_stage=1, input_partitions=3, input_tasks=3
6666
└──────────────────────────────────────────────────
67-
┌───── Stage 1 Task: partitions: 0,unassigned],Task: partitions: 1,unassigned],Task: partitions: 2,unassigned]
68-
partitions [out:3 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
69-
partitions [out:1 <-- in:1 ] PartitionIsolatorExec [providing upto 1 partitions]
70-
partitions [out:1 <-- in:1 ] AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
71-
partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet
67+
┌───── Stage 1 Tasks: t0:[p0] t1:[p1] t2:[p2]
68+
│ RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
69+
PartitionIsolatorExec Tasks: t0:[p0] t1:[p1] t2:[p2]
70+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
71+
│ DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet
7272
└──────────────────────────────────────────────────
7373
",
7474
);

tests/highly_distributed_query.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,20 @@ mod tests {
4545

4646
assert_snapshot!(physical_distributed_str,
4747
@r"
48-
┌───── Stage 4 Task: partitions: 0..4,unassigned]
49-
partitions [out:5 ] ArrowFlightReadExec: Stage 3
48+
┌───── Stage 4 Tasks: t0:[p0,p1,p2,p3,p4]
49+
ArrowFlightReadExec input_stage=3, input_partitions=5, input_tasks=1
5050
└──────────────────────────────────────────────────
51-
┌───── Stage 3 Task: partitions: 0..4,unassigned]
52-
partitions [out:5 <-- in:10 ] RepartitionExec: partitioning=RoundRobinBatch(5), input_partitions=10
53-
partitions [out:10 ] ArrowFlightReadExec: Stage 2
51+
┌───── Stage 3 Tasks: t0:[p0,p1,p2,p3,p4]
52+
│ RepartitionExec: partitioning=RoundRobinBatch(5), input_partitions=10
53+
ArrowFlightReadExec input_stage=2, input_partitions=10, input_tasks=1
5454
└──────────────────────────────────────────────────
55-
┌───── Stage 2 Task: partitions: 0..9,unassigned]
56-
partitions [out:10 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
57-
partitions [out:1 ] ArrowFlightReadExec: Stage 1
55+
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
56+
│ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
57+
ArrowFlightReadExec input_stage=1, input_partitions=1, input_tasks=1
5858
└──────────────────────────────────────────────────
59-
┌───── Stage 1 Task: partitions: 0,unassigned]
60-
partitions [out:1 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1
61-
partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/flights-1m.parquet]]}, projection=[FL_DATE, DEP_DELAY, ARR_DELAY, AIR_TIME, DISTANCE, DEP_TIME, ARR_TIME], file_type=parquet
59+
┌───── Stage 1 Tasks: t0:[p0]
60+
│ RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1
61+
│ DataSourceExec: file_groups={1 group: [[/testdata/flights-1m.parquet]]}, projection=[FL_DATE, DEP_DELAY, ARR_DELAY, AIR_TIME, DISTANCE, DEP_TIME, ARR_TIME], file_type=parquet
6262
└──────────────────────────────────────────────────
6363
",
6464
);

0 commit comments

Comments
 (0)