Skip to content

Commit 3d3609c

Browse files
metrics: make MetricsWrapperExec transparent
Before this change, `name()` and `as_any()` on `MetricsWrapperExec` did not delegate calls to the wrapped node. This was a problem because a user could not traverse the plan and extract metrics from specific nodes. After this change, those methods are delegated so a user can now extract metrics from specific nodes.
1 parent 01b0866 commit 3d3609c

File tree

2 files changed

+30
-17
lines changed

2 files changed

+30
-17
lines changed

src/execution_plans/metrics.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@ use datafusion::error::Result;
55
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
66
use datafusion::physical_plan::ExecutionPlan;
77
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
8+
use delegate::delegate;
89
use std::any::Any;
910
use std::fmt::{Debug, Formatter};
1011

1112
/// A transparent wrapper that delegates all execution to its child but returns custom metrics. This node is invisible during display.
1213
/// The structure of a plan tree is closely tied to the [TaskMetricsRewriter].
1314
pub struct MetricsWrapperExec {
14-
pub(crate) inner: Arc<dyn ExecutionPlan>,
15+
inner: Arc<dyn ExecutionPlan>,
1516
/// metrics for this plan node.
1617
metrics: MetricsSet,
1718
/// children is initially None. When used by the [TaskMetricsRewriter], the children will be updated
@@ -31,7 +32,7 @@ impl MetricsWrapperExec {
3132

3233
/// MetricsWrapperExec is invisible during display.
3334
impl DisplayAs for MetricsWrapperExec {
34-
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
35+
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
3536
self.inner.fmt_as(t, f)
3637
}
3738
}
@@ -44,18 +45,15 @@ impl Debug for MetricsWrapperExec {
4445
}
4546

4647
impl ExecutionPlan for MetricsWrapperExec {
47-
fn name(&self) -> &str {
48-
"MetricsWrapperExec"
49-
}
50-
51-
fn as_any(&self) -> &dyn Any {
52-
self
53-
}
54-
55-
fn properties(&self) -> &PlanProperties {
56-
self.inner.properties()
48+
delegate! {
49+
to self.inner {
50+
fn name(&self) -> &str;
51+
fn properties(&self) -> &PlanProperties;
52+
fn as_any(&self) -> &dyn Any;
53+
}
5754
}
5855

56+
/// Retrusn
5957
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
6058
match &self.children {
6159
Some(children) => children.iter().collect(),

src/metrics/task_metrics_rewriter.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,14 @@ mod tests {
217217
use itertools::Itertools;
218218
use uuid::Uuid;
219219

220-
use datafusion::prelude::SessionConfig;
221-
use datafusion::prelude::SessionContext;
222-
use std::sync::Arc;
223-
224220
use crate::DistributedExt;
225221
use crate::DistributedPhysicalOptimizerRule;
226222
use crate::metrics::task_metrics_rewriter::MetricsWrapperExec;
223+
use datafusion::physical_plan::empty::EmptyExec;
227224
use datafusion::physical_plan::metrics::MetricsSet;
225+
use datafusion::prelude::SessionConfig;
226+
use datafusion::prelude::SessionContext;
227+
use std::sync::Arc;
228228

229229
async fn make_test_ctx() -> SessionContext {
230230
make_test_ctx_inner(false).await
@@ -459,7 +459,6 @@ mod tests {
459459
let is_partition_isolator =
460460
if let Some(metrics_wrapper) = plan.as_any().downcast_ref::<MetricsWrapperExec>() {
461461
metrics_wrapper
462-
.inner
463462
.as_any()
464463
.downcast_ref::<PartitionIsolatorExec>()
465464
.is_some()
@@ -492,4 +491,20 @@ mod tests {
492491
let rewritten_plan = rewrite_distributed_plan_with_metrics(plan).unwrap();
493492
assert_metrics_present_in_plan(&rewritten_plan);
494493
}
494+
495+
#[test]
496+
// An important feature of DF execution plans which we want to preserve is the ability
497+
// to traverse a plan and collect metrics from specific nodes. To do this, the wrapper must
498+
// allow access to the inner node. This test asserts that we support this.
499+
fn test_wrapped_node_is_accessible() {
500+
let example_node = Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![Field::new(
501+
"id",
502+
DataType::Int32,
503+
false,
504+
)]))));
505+
506+
let wrapped = MetricsWrapperExec::new(example_node, MetricsSet::new());
507+
assert_eq!(wrapped.name(), "EmptyExec");
508+
assert!(wrapped.as_any().is::<EmptyExec>());
509+
}
495510
}

0 commit comments

Comments
 (0)