From 8d2030a5cbfe2424bf9c351030e0417309e3e694 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 23 Oct 2025 10:09:02 -0400 Subject: [PATCH] metrics: make MetricsWrapperExec transparent Before this change, `name()` and `as_any()` on `MetricsWrapperExec` did not delegate calls to the wrapped node. This was a problem because a user could not traverse the plan and extract metrics from specific nodes. After this change, those methods are delegated so a user can now extract metrics from specific nodes. --- src/execution_plans/metrics.rs | 22 ++++++++++------------ src/metrics/task_metrics_rewriter.rs | 25 ++++++++++++++++++++----- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/src/execution_plans/metrics.rs b/src/execution_plans/metrics.rs index 9e0bc60..f905fe9 100644 --- a/src/execution_plans/metrics.rs +++ b/src/execution_plans/metrics.rs @@ -5,13 +5,14 @@ use datafusion::error::Result; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties}; +use delegate::delegate; use std::any::Any; use std::fmt::{Debug, Formatter}; /// A transparent wrapper that delegates all execution to its child but returns custom metrics. This node is invisible during display. /// The structure of a plan tree is closely tied to the [TaskMetricsRewriter]. pub struct MetricsWrapperExec { - pub(crate) inner: Arc, + inner: Arc, /// metrics for this plan node. metrics: MetricsSet, /// children is initially None. When used by the [TaskMetricsRewriter], the children will be updated @@ -31,7 +32,7 @@ impl MetricsWrapperExec { /// MetricsWrapperExec is invisible during display. impl DisplayAs for MetricsWrapperExec { - fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { self.inner.fmt_as(t, f) } } @@ -44,18 +45,15 @@ impl Debug for MetricsWrapperExec { } impl ExecutionPlan for MetricsWrapperExec { - fn name(&self) -> &str { - "MetricsWrapperExec" - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - self.inner.properties() + delegate! { + to self.inner { + fn name(&self) -> &str; + fn properties(&self) -> &PlanProperties; + fn as_any(&self) -> &dyn Any; + } } + /// Retrusn fn children(&self) -> Vec<&Arc> { match &self.children { Some(children) => children.iter().collect(), diff --git a/src/metrics/task_metrics_rewriter.rs b/src/metrics/task_metrics_rewriter.rs index 4c6812d..e170fe3 100644 --- a/src/metrics/task_metrics_rewriter.rs +++ b/src/metrics/task_metrics_rewriter.rs @@ -217,14 +217,14 @@ mod tests { use itertools::Itertools; use uuid::Uuid; - use datafusion::prelude::SessionConfig; - use datafusion::prelude::SessionContext; - use std::sync::Arc; - use crate::DistributedExt; use crate::DistributedPhysicalOptimizerRule; use crate::metrics::task_metrics_rewriter::MetricsWrapperExec; + use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::metrics::MetricsSet; + use datafusion::prelude::SessionConfig; + use datafusion::prelude::SessionContext; + use std::sync::Arc; async fn make_test_ctx() -> SessionContext { make_test_ctx_inner(false).await @@ -459,7 +459,6 @@ mod tests { let is_partition_isolator = if let Some(metrics_wrapper) = plan.as_any().downcast_ref::() { metrics_wrapper - .inner .as_any() .downcast_ref::() .is_some() @@ -492,4 +491,20 @@ mod tests { let rewritten_plan = rewrite_distributed_plan_with_metrics(plan).unwrap(); assert_metrics_present_in_plan(&rewritten_plan); } + + #[test] + // An important feature of DF execution plans which we want to preserve is the ability + // to traverse a plan and collect metrics from specific nodes. To do this, the wrapper must + // allow access to the inner node. This test asserts that we support this. + fn test_wrapped_node_is_accessible() { + let example_node = Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![Field::new( + "id", + DataType::Int32, + false, + )])))); + + let wrapped = MetricsWrapperExec::new(example_node, MetricsSet::new()); + assert_eq!(wrapped.name(), "EmptyExec"); + assert!(wrapped.as_any().is::()); + } }