Skip to content

Conversation

@gabotechs
Copy link
Collaborator

@gabotechs gabotechs commented Oct 6, 2025

Closes #177.

This PR changes how we structure the distributed plans, without changing how we execute them or how we display them (mostly).

The idea is to bring us closer to a normal DataFusion plan, and:

  • rather than nesting plans inside of StageExec plans:
┌─────────────────────────────┐
│          StageExec          │
│                             │
│┌───────────────────────────┐│
││                           ││
││      ProjectionExec       ││
││                           ││
│└───────────────────────────┘│
│┌───────────────────────────┐│
││                           ││
││   AggregateExec (final)   ││
││                           ││
│└───────────────────────────┘│
│┌───────────────────────────┐│
││                           ││
││    NetworkShuffleExec     ││
││                           ││
│└───────────────────────────┘│
└─────────────────────────────┘
               ▲               
               │               
               │               
               │               
┌─────────────────────────────┐
│          StageExec          │
│                             │
│┌───────────────────────────┐│
││                           ││
││      RepartitionExec      ││
││                           ││
│└───────────────────────────┘│
│┌───────────────────────────┐│
││                           ││
││  AggregateExec (partial)  ││
││                           ││
│└───────────────────────────┘│
│┌───────────────────────────┐│
││                           ││
││      DataSourceExec       ││
││                           ││
│└───────────────────────────┘│
└─────────────────────────────┘

To produce a flattened output as any other normal DataFusion plan

┌───────────────────────────┐
│                           │
│      DistributedExec      │
│                           │
└───────────────────────────┘
┌───────────────────────────┐
│                           │
│      ProjectionExec       │
│                           │
└───────────────────────────┘
┌───────────────────────────┐
│                           │
│   AggregateExec (final)   │
│                           │
└───────────────────────────┘
┌───────────────────────────┐
│                           │
│    NetworkShuffleExec     │
│                           │
└───────────────────────────┘
┌───────────────────────────┐
│                           │
│      RepartitionExec      │
│                           │
└───────────────────────────┘
┌───────────────────────────┐
│                           │
│  AggregateExec (partial)  │
│                           │
└───────────────────────────┘
┌───────────────────────────┐
│                           │
│      DataSourceExec       │
│                           │
└───────────────────────────┘

This will play better with DataFusion tooling that tries to traverse the full plan, reassign children, repartition everything, etc...

In the way, it also results in less code and simpler

" read_from=Stage {stage}, output_partitions={partitions}, n_tasks={n_tasks}, input_tasks={input_tasks}",
)?;
}
pub fn display_plan_ascii(plan: &dyn ExecutionPlan) -> String {
Copy link
Collaborator

@jayshrivastava jayshrivastava Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using display_plan_ascii, can we make this more native? We can implement Display on PartitionIsolator, NetworkCoalesceExec, NetworkShuffleExec. Even though the display output may differ from what we have now, I think this plays well with DF principles. It will also work better with datafusion-tracing since it wraps every node. Using trait methods allows you to access self for each node without downcasting.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, it would be nice if displayable, much like execute() worked out of the box without downcasting. I think truly "interoperating well" with datafusion actually means that we can use/implement these native methods. It gives me confidence that this would work with other people's DF extensions and custom nodes etc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can actually get everything in your PR working without downcasting outside of the optimizer step but I'm totally lost on metrics sadly. I think we will need to contribute upstream.

The root issue that makes it hard is that I have no access to self nor the TaskContext while traversing a plan using the TreeNodeRewriter. We need a more native way to inject metrics IMO - in the ExecutionPlan trait.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using display_plan_ascii, can we make this more native?

You could that now, and the output is nice also, but it does not contain ascii graphics. I kept this function for backwards compatibility with previous ways of visualizing plans.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root issue that makes it hard is that I have no access to self nor the TaskContext while traversing a plan using the TreeNodeRewriter. We need a more native way to inject metrics IMO - in the ExecutionPlan trait.

👍 that makes sense. Probably worth opening a discussion, maybe Ballista folks have something useful to say about that

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stepping back, I think it's impossible to actually render "natively". For any child node in vanilla DF, there's never more than one parent, I think. In our case, we could have NetworkShuffleExecs that read from the same task / stage, so we need to have our own renderer-visitor such as below to avoid rendering the same task twice. This seems right.

Copy link
Collaborator

@jayshrivastava jayshrivastava Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more high level thought - maybe we should implement our own visitor which is capable of traversing tasks uniquely? Then we could display using that visitor. Not necessary in this PR but I'm just thinking about loud.

In the metrics use case, it would allow users to go find a particular node in a particular task and grab its metrics. Not sure how important that is - but that's certainly allowed in vanilla DF, so why not this project?

(Also I would still like to be able to inject metrics retroactively if possible by adding new_with_metrics, but the MetricsWrapperExec can get the job done in the mean time)

Copy link
Collaborator Author

@gabotechs gabotechs Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more "native" render will look like this:

DistributedExec
  CoalescePartitionsExec
    [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
      AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
        CoalesceBatchesExec: target_batch_size=8192
          [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
            RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=4
              RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
                AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
                  PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0] 
                    DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet

This is what you currently get if you do:

        let display = displayable(physical_plan.as_ref()).indent(true).to_string();

I'd say, that's how a native render should look like. All our ASCII graphics trickery is probably not suitable to how DF wants to render things by default, and for the same reason that we have a separate method for rendering Graphviz, we probably also should use a separate method for rendering ASCII

@gabotechs gabotechs force-pushed the gabrielmusat/rework-stage-hierarchy-for-better-interoperability branch 3 times, most recently from 7d8f606 to a8483ab Compare October 7, 2025 14:03
@gabotechs gabotechs force-pushed the gabrielmusat/rework-stage-hierarchy-for-better-interoperability branch from a8483ab to e469202 Compare October 7, 2025 14:49
@gabotechs gabotechs marked this pull request as ready for review October 7, 2025 15:41
@gabotechs gabotechs changed the title Gabrielmusat/rework stage hierarchy for better interoperability Rework stage hierarchy for better interoperability Oct 7, 2025
@gabotechs gabotechs changed the title Rework stage hierarchy for better interoperability Rework execution plan hierarchy for better interoperability Oct 7, 2025
Copy link
Collaborator

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. I've been thinking about this change a lot. Overall, this change is good 👍🏽. I will give it a line-by-line review tmr morning.

It would be valuable to ask Geoffrey and/or the DF maintainers about the downcasting thing which I've commented about below. I believe I'm pro-downcasting though, it seems like the DF way.

}
}

impl NetworkBoundaryExt for dyn ExecutionPlan {
Copy link
Collaborator

@jayshrivastava jayshrivastava Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this means we are able to support datafusion-tracing (even using this method). Any call to as_network_boundary(&self) on an InstrumentationExec which wraps a Network*, will not actually return a NetworkBoundary.

Even if you add this, we can never call it because we will never see a concrete InstrumentationExec.

impl NetworkBoundaryExt for InstrumentationExec {
    fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary> {
         self.inner.as_network_boundary()
    }
}

We will only see dyn ExecutionPlan, which will call the blanket implementation you wrote here.

I also tried this

impl<T: ExecutionPlan + ?Sized> NetworkBoundaryExt for T {
    fn try_as_network_boundary(&self) -> Option<&dyn NetworkBoundary> {
        None
    }
}

impl NetworkBoundaryExt for InstrumentationExec {
    fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary> {
         self.inner.as_network_boundary()
    }
}

but now the compiler throws an error because these are conflicting implementations (since InstrumentationExec implements ExecutionPlan + ?Sized.

This all boils down to specialization being unstable: rust-lang/rust#31844. You can't have a "default blanket implementation" for a trait and override it in stable rust.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately, datafusion-distributed is at odds with datafusion-tracing because as_any().downcast_ref breaks under indirection caused by wrapper types like InstrumentedExec.

  • are wrappers like in datafusion-tracing an anti pattern?
  • is downcasting after planning an anti-pattern?

I suspect that InstrumentedExec is actually the anti pattern.

  • All other "wrappers" like CooperativeExec expose the children, so they would be (a) encountered during a plan traversal; and (b) downcastable. I asked claude and it seems that InstrumentedExec is the outlier - we could ask Geoffrey about why it's implemented this way.
  • The old StageExec implementation was problematic for the same reason - we were obfuscating the parent-child relationships.
  • When you want a transparent wrapper, why not just add it between two nodes and return an empty result in the impl Display?
  • As you mentioned before, as_any is available in the ExecutionPlan trait.

Copy link
Collaborator

@jayshrivastava jayshrivastava Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If InstrumentedExec just delegated its as_any to the underlying type, we would avoid the issue. This would be a good question for DF maintainers...

It's only used here to avoid double wrapping. I wonder why they even have to worry about that. It's supposed to be the last optimizer rule, so it shouldn't get run more than once...

https://github.com/datafusion-contrib/datafusion-tracing/blob/f0aee9ed2960fa101570ddc7ad11670c8ee64289/datafusion-tracing/src/instrument_rule.rs#L73

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine if InstrumentedExec::as_any returned the underlaying ExecutionPlan, downcasting would work, but upon reassigning a new version of the node in a plan.transform_down() operation the InstrumentedExec wrapper will be lost.

I'd say, that still sounds like a better option than completely obfuscating the plan like InstrumentedExec is currently doing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the feeling we are not going to solve this problem in this PR though... we'll probably need some further work in other places.

urls: &[Url],
codec: &dyn PhysicalExtensionCodec,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
let prepared = Arc::clone(&self.plan).transform_up(|plan| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done during planning? Totally fine to do as a follow up. Curious why we do it here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we do two things:

  1. URL assignation
  2. Stage plan encoding

we want to do 1) as late as possible, because otherwise, if we do it during planning, by the time we get to execute the plan, the assigned URLs might no longer be valid. By making the URL assignation happen as close to execution as possible, we reduce the risk of workers no longer being available. This is how it was done previously also, it's not really something new.

we want to do 2) right before execution because plans in the "encoded" state are not visible or inspectable or pretty much anything, and with a plan at hand, we might want to be able to display it or things like that.

}

/// Helper enum for storing either borrowed or owned trait object references
enum Referenced<'a, T: ?Sized> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it would be useful as a util. Seems a bit out of place here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷‍♂️ it's only really used here. I'm trying to be faithful to the "not put in a common module things that are not common".

Do you think this better belongs to the common module?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to put here. We can move things into common if we ever see repetition.

" read_from=Stage {stage}, output_partitions={partitions}, n_tasks={n_tasks}, input_tasks={input_tasks}",
)?;
}
pub fn display_plan_ascii(plan: &dyn ExecutionPlan) -> String {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stepping back, I think it's impossible to actually render "natively". For any child node in vanilla DF, there's never more than one parent, I think. In our case, we could have NetworkShuffleExecs that read from the same task / stage, so we need to have our own renderer-visitor such as below to avoid rendering the same task twice. This seems right.

" read_from=Stage {stage}, output_partitions={partitions}, n_tasks={n_tasks}, input_tasks={input_tasks}",
)?;
}
pub fn display_plan_ascii(plan: &dyn ExecutionPlan) -> String {
Copy link
Collaborator

@jayshrivastava jayshrivastava Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more high level thought - maybe we should implement our own visitor which is capable of traversing tasks uniquely? Then we could display using that visitor. Not necessary in this PR but I'm just thinking about loud.

In the metrics use case, it would allow users to go find a particular node in a particular task and grab its metrics. Not sure how important that is - but that's certainly allowed in vanilla DF, so why not this project?

(Also I would still like to be able to inject metrics retroactively if possible by adding new_with_metrics, but the MetricsWrapperExec can get the job done in the mean time)

use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;

pub fn one_child(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a doc comment would be nice
nit: would src/common/execution_plan.rs or src/execution_plans/common.rs make more sense?
nit: maybe require_one_child makes more sense

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/execution_plans/common.rs sounds like a good place for helpers that are used in more than one plan, but that are scoped to src/execution_plans.

Followed your suggestion with this, and with a couple other helpers that were also only used in src/execution_plans

}

/// Helper enum for storing either borrowed or owned trait object references
enum Referenced<'a, T: ?Sized> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to put here. We can move things into common if we ever see repetition.

&self,
n_tasks: usize,
) -> Result<(Arc<dyn ExecutionPlan>, usize), DataFusionError>;
fn to_stage_info(&self, n_tasks: usize) -> Result<(Arc<dyn ExecutionPlan>, usize)>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by this. The implementations return an error if n_tasks > 1 or they are not in the pending state. I'm not sure why. It would be nice if this was captured in the name and/or comment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment in NetworkCoalesceExec.

TL;DR: NetworkCoalesceExec coalesces N tasks into 1, therefore, it must run in exactly 1 task. That error is a controlled error that signals the planner to assign 1 task to it.

fn with_input_stage(&self, input_stage: Stage) -> Result<Arc<dyn ExecutionPlan>>;

/// Returns the assigned input [Stage], if any.
fn input_stage(&self) -> Option<&Stage>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker but this returns None IFF the boundary is pending, which only happens during planning. It would be nice to capture this semantic in the type system somehow. Just something to think about. I don't have any ideas at the moment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 yeah agree... I also have no ideas

│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2
[Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this

…er-interoperability

# Conflicts:
#	src/common/mod.rs
#	src/execution_plans/network_coalesce.rs
#	src/execution_plans/network_shuffle.rs
#	src/flight_service/do_get.rs
@gabotechs gabotechs merged commit a638e6d into main Oct 9, 2025
4 checks passed
@gabotechs gabotechs deleted the gabrielmusat/rework-stage-hierarchy-for-better-interoperability branch October 9, 2025 06:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Distributed plans do not support further optimization passes

3 participants