Skip to content

Commit a5b3a81

Browse files
authored
Show physical plan with metrics in benchmark (apache#662)
1 parent 9314dbb commit a5b3a81

File tree

2 files changed

+55
-10
lines changed

2 files changed

+55
-10
lines changed

benchmarks/src/bin/tpch.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use datafusion::prelude::*;
4242

4343
use datafusion::parquet::basic::Compression;
4444
use datafusion::parquet::file::properties::WriterProperties;
45+
use datafusion::physical_plan::display::DisplayableExecutionPlan;
4546
use structopt::StructOpt;
4647

4748
#[cfg(feature = "snmalloc")]
@@ -343,21 +344,27 @@ async fn execute_query(
343344
debug: bool,
344345
) -> Result<Vec<RecordBatch>> {
345346
if debug {
346-
println!("Logical plan:\n{:?}", plan);
347+
println!("=== Logical plan ===\n{:?}\n", plan);
347348
}
348349
let plan = ctx.optimize(plan)?;
349350
if debug {
350-
println!("Optimized logical plan:\n{:?}", plan);
351+
println!("=== Optimized logical plan ===\n{:?}\n", plan);
351352
}
352353
let physical_plan = ctx.create_physical_plan(&plan)?;
353354
if debug {
354355
println!(
355-
"Physical plan:\n{}",
356+
"=== Physical plan ===\n{}\n",
356357
displayable(physical_plan.as_ref()).indent().to_string()
357358
);
358359
}
359-
let result = collect(physical_plan).await?;
360+
let result = collect(physical_plan.clone()).await?;
360361
if debug {
362+
println!(
363+
"=== Physical plan with metrics ===\n{}\n",
364+
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
365+
.indent()
366+
.to_string()
367+
);
361368
pretty::print_batches(&result)?;
362369
}
363370
Ok(result)

datafusion/src/physical_plan/display.rs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,27 @@ pub enum DisplayFormatType {
3333
/// Wraps an `ExecutionPlan` with various ways to display this plan
3434
pub struct DisplayableExecutionPlan<'a> {
3535
inner: &'a dyn ExecutionPlan,
36+
/// whether to show metrics or not
37+
with_metrics: bool,
3638
}
3739

3840
impl<'a> DisplayableExecutionPlan<'a> {
3941
/// Create a wrapper around an [`'ExecutionPlan'] which can be
4042
/// pretty printed in a variety of ways
4143
pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
42-
Self { inner }
44+
Self {
45+
inner,
46+
with_metrics: false,
47+
}
48+
}
49+
50+
/// Create a wrapper around an [`'ExecutionPlan'] which can be
51+
/// pretty printed in a variety of ways
52+
pub fn with_metrics(inner: &'a dyn ExecutionPlan) -> Self {
53+
Self {
54+
inner,
55+
with_metrics: true,
56+
}
4357
}
4458

4559
/// Return a `format`able structure that produces a single line
@@ -53,15 +67,26 @@ impl<'a> DisplayableExecutionPlan<'a> {
5367
/// CsvExec: source=...",
5468
/// ```
5569
pub fn indent(&self) -> impl fmt::Display + 'a {
56-
struct Wrapper<'a>(&'a dyn ExecutionPlan);
70+
struct Wrapper<'a> {
71+
plan: &'a dyn ExecutionPlan,
72+
with_metrics: bool,
73+
}
5774
impl<'a> fmt::Display for Wrapper<'a> {
5875
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5976
let t = DisplayFormatType::Default;
60-
let mut visitor = IndentVisitor { t, f, indent: 0 };
61-
accept(self.0, &mut visitor)
77+
let mut visitor = IndentVisitor {
78+
t,
79+
f,
80+
indent: 0,
81+
with_metrics: self.with_metrics,
82+
};
83+
accept(self.plan, &mut visitor)
6284
}
6385
}
64-
Wrapper(self.inner)
86+
Wrapper {
87+
plan: self.inner,
88+
with_metrics: self.with_metrics,
89+
}
6590
}
6691
}
6792

@@ -71,8 +96,10 @@ struct IndentVisitor<'a, 'b> {
7196
t: DisplayFormatType,
7297
/// Write to this formatter
7398
f: &'a mut fmt::Formatter<'b>,
74-
///with_schema: bool,
99+
/// Indent size
75100
indent: usize,
101+
/// whether to show metrics or not
102+
with_metrics: bool,
76103
}
77104

78105
impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
@@ -83,6 +110,17 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
83110
) -> std::result::Result<bool, Self::Error> {
84111
write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
85112
plan.fmt_as(self.t, self.f)?;
113+
if self.with_metrics {
114+
write!(
115+
self.f,
116+
", metrics=[{}]",
117+
plan.metrics()
118+
.iter()
119+
.map(|(k, v)| format!("{}={:?}", k, v.value))
120+
.collect::<Vec<_>>()
121+
.join(", ")
122+
)?;
123+
}
86124
writeln!(self.f)?;
87125
self.indent += 1;
88126
Ok(true)

0 commit comments

Comments
 (0)