Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions src/execution_plans/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use datafusion::error::Result;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
use delegate::delegate;
use std::any::Any;
use std::fmt::{Debug, Formatter};

/// A transparent wrapper that delegates all execution to its child but returns custom metrics. This node is invisible during display.
/// The structure of a plan tree is closely tied to the [TaskMetricsRewriter].
pub struct MetricsWrapperExec {
pub(crate) inner: Arc<dyn ExecutionPlan>,
inner: Arc<dyn ExecutionPlan>,
/// metrics for this plan node.
metrics: MetricsSet,
/// children is initially None. When used by the [TaskMetricsRewriter], the children will be updated
Expand All @@ -31,7 +32,7 @@ impl MetricsWrapperExec {

/// MetricsWrapperExec is invisible during display.
impl DisplayAs for MetricsWrapperExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
self.inner.fmt_as(t, f)
}
}
Expand All @@ -44,18 +45,15 @@ impl Debug for MetricsWrapperExec {
}

impl ExecutionPlan for MetricsWrapperExec {
fn name(&self) -> &str {
"MetricsWrapperExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
self.inner.properties()
delegate! {
to self.inner {
fn name(&self) -> &str;
fn properties(&self) -> &PlanProperties;
fn as_any(&self) -> &dyn Any;
}
}

/// Retrusn
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
match &self.children {
Some(children) => children.iter().collect(),
Expand Down
25 changes: 20 additions & 5 deletions src/metrics/task_metrics_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ mod tests {
use itertools::Itertools;
use uuid::Uuid;

use datafusion::prelude::SessionConfig;
use datafusion::prelude::SessionContext;
use std::sync::Arc;

use crate::DistributedExt;
use crate::DistributedPhysicalOptimizerRule;
use crate::metrics::task_metrics_rewriter::MetricsWrapperExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::prelude::SessionConfig;
use datafusion::prelude::SessionContext;
use std::sync::Arc;

async fn make_test_ctx() -> SessionContext {
make_test_ctx_inner(false).await
Expand Down Expand Up @@ -459,7 +459,6 @@ mod tests {
let is_partition_isolator =
if let Some(metrics_wrapper) = plan.as_any().downcast_ref::<MetricsWrapperExec>() {
metrics_wrapper
.inner
.as_any()
.downcast_ref::<PartitionIsolatorExec>()
.is_some()
Expand Down Expand Up @@ -492,4 +491,20 @@ mod tests {
let rewritten_plan = rewrite_distributed_plan_with_metrics(plan).unwrap();
assert_metrics_present_in_plan(&rewritten_plan);
}

#[test]
// An important feature of DF execution plans which we want to preserve is the ability
// to traverse a plan and collect metrics from specific nodes. To do this, the wrapper must
// allow access to the inner node. This test asserts that we support this.
fn test_wrapped_node_is_accessible() {
let example_node = Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![Field::new(
"id",
DataType::Int32,
false,
)]))));

let wrapped = MetricsWrapperExec::new(example_node, MetricsSet::new());
assert_eq!(wrapped.name(), "EmptyExec");
assert!(wrapped.as_any().is::<EmptyExec>());
}
}