Skip to content

Conversation

@askalt
Copy link
Contributor

@askalt askalt commented Dec 22, 2025

Which issue does this PR close?

Closes #19351

What changes are included in this PR?

This patch introduces the stateless physical plan feature. Currently, the physical-plan crate is fully supported. This feature allows for the reuse of physical plans and their concurrent execution.

The feature is implemented by adding a separate Cargo feature named "stateless_plan". The implementation consists of several parts:

State tree

With the "stateless_plan" feature enabled, the plans themselves do not store state. The state is stored in a separate tree composed of PlanStateNodes, which is built lazily during plan execution. Each node of the tree stores not only the shared state of the plan but also its metrics. The shape of the state tree matches the shape of the execution plan tree.

Metrics

Metrics are stored in the nodes of the state tree and can be accessed after plan execution. Support is provided for performing EXPLAIN using the state.

Dynamic Filters

In the case of stateless plans, dynamic filters cannot simply be stored inside the plans, as the same plan can be executed concurrently. To overcome this, a dynamic filter is split into two parts: a planning-time version and an execution-time version. The plans contain the planning-time version, which is transformed into the execution version during the execution phase and then passed from parent nodes to child nodes using the state tree.

WorkTable

Instead of explicitly injecting the WorkTable into nodes, RecursiveExec exposes the WorkTable in the state stored within the State Tree. Then, a node interested in obtaining the WorkTable traverses up the State Tree and thus retrieves the current WorkTable.

Are these changes tested?

Currently only locally as the patch introduces a new isolated feature which is not tested in CI yet.

Following work

  • Support stateless plan for all other DataFusion crates.
  • Enable running tests with this feature in CI.
  • Deprecate stateful plans to eventually transition completely to the stateless version.
  • Add fmt_as_with_state to allow plans to include state-specific details in the EXPLAIN output, such as dynamic filters.

This patch introduces the stateless physical plan feature. Currently, the physical-plan crate
is fully supported. This feature allows for the reuse of physical plans and their concurrent
execution.

The feature is implemented by adding a separate Cargo feature named "stateless_plan".
The implementation consists of several parts:

* State tree.

With the "stateless_plan" feature enabled, the plans themselves do not store state. The state
is stored in a separate tree composed of PlanStateNodes, which is built lazily during plan execution.
Each node of the tree stores not only the shared state of the plan but also its metrics. The shape
of the state tree matches the shape of the execution plan tree.

* Metrics

Metrics are stored in the nodes of the state tree and can be accessed after plan execution. Support
is provided for performing EXPLAIN using the state.

* Dynamic Filters

In the case of stateless plans, dynamic filters cannot simply be stored inside the plans, as the same
plan can be executed concurrently. To overcome this, a dynamic filter is split into two parts: a planning-time
version and an execution-time version. The plans contain the planning-time version, which is transformed into
the execution version during the execution phase and then passed from parent nodes to child nodes using the
state tree.

* WorkTable

Instead of explicitly injecting the WorkTable into nodes, RecursiveExec exposes the WorkTable in the state stored
within the State Tree. Then, a node interested in obtaining the WorkTable traverses up the State Tree and thus
retrieves the current WorkTable.

Planned following work:

- Support stateless plan for all other DataFusion crates.
- Enable running tests with this feature in CI.
- Deprecate stateful plans to eventually transition completely to the stateless version.
- Add `fmt_as_with_state` to allow plans to include state-specific details in the EXPLAIN output, such as dynamic filters.

Closes apache#19351
@github-actions github-actions bot added physical-expr Changes to the physical-expr crates execution Related to the execution crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Dec 22, 2025
@askalt
Copy link
Contributor Author

askalt commented Dec 22, 2025

The following diagram explains the implementantion:

Screenshot from 2025-12-22 23-30-49

Where:

  • Green rectangles are physical plan nodes.
  • Orange rectangles are corresponding plan state nodes.
  • Blue arrows are strong refs.
  • Read arrows are weak refs.

@askalt
Copy link
Contributor Author

askalt commented Dec 24, 2025

@alamb Could you please help to review this proof of concept? There is a rather large diff because all physical plan states have been moved from the plan itself. However, the core idea can be grasped quickly—for example, by looking into the changes in https://github.com/apache/datafusion/blob/7e009ff2db1b2fbd2760923497f18b2a7c46557e/datafusion/physical-plan/src/state.rs

@askalt
Copy link
Contributor Author

askalt commented Dec 29, 2025

Added a benchmark to show the difference in performance. It compares two approaches that allow to re-use a plan (we do not consider plan re-building as it spends too much on optimizations, this work can be done once).

Used plan is relatively small:

ProjectionExec
   FilterExec
     AggregateExec: mode=Final
       CoalescePartitionsExec
         AggregateExec: mode=Partial
           RepartitionExec
             FilterExec
               TestMemoryExec

The plan is generated by default planner for the query like:

SELECT aggr1(col1) as aggr1, aggr2(col2) as aggr2 FROM t
WHERE p1
HAVING p2

Re-set plan state on each use

#[cfg(not(feature = "stateless_plan"))]
fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
    plan.transform_up(|plan| {
        let new_plan = Arc::clone(&plan).reset_state()?;
        Ok(Transformed::yes(new_plan))
    })
    .unwrap()
    .data
}

This requires to re-compute plans properties, as for many plans the method with_new_children uses Self::try_new to build a new plan. For example, FilterExec, AggregateExec, ProjectionExec. So, on each iteration we re-build the whole tree.

On my machine:

$ cargo bench    --bench plan_reuse
...
time:   [4.4009 ms 4.4068 ms 4.4130 ms]

Use stateless_plan feature

Instead of re-building the whole plan on each iteration, we build a state tree.

It gives:

$ cargo bench  --features stateless_plan   --bench plan_reuse

time:   [72.424 µs 72.607 µs 72.795 µs]

Which does not depend on with_new_children and Self::try_new logic analysis because they are not called at all. So this approach is scaled better when a plan nodes number increases.

@alamb
Copy link
Contributor

alamb commented Dec 30, 2025

This requires to re-compute plans properties, as for many plans the method with_new_children uses Self::try_new to build a new plan. For example, FilterExec, AggregateExec, ProjectionExec. So, on each iteration we re-build the whole tree.

I wonder if you have considered trying to optimize the re-creation of the plan (e.g optimizing with_new_children for the relevant nodes to avoid recomputing properties when the children have the same properties)

In general I would really love to help make DataFusion planning (much) faster -- I think we have all the pieces now, but it will take some focused profiling effort to knock down the things that consume time to plan

@askalt
Copy link
Contributor Author

askalt commented Dec 30, 2025

@alamb, Thank you for the feedback!

I wonder if you have considered trying to optimize the re-creation of the plan (e.g optimizing with_new_children for the relevant nodes to avoid recomputing properties when the children have the same properties)

It seems this approach implies that we must somehow know that the properties remain unchanged for each particular plan. That sounds much harder to me than extracting state from the plan and not calling with_new_children at all -- in other words, avoiding analysis unless it is required.

In general I would really love to help make DataFusion planning (much) faster -- I think we have all the pieces now, but it will take some focused profiling effort to knock down the things that consume time to plan

A planning performance boost would make much more sense to me, but in this issue and PR, I am only considering the situation where the plan is already built, optimized, and ready to be reused as an artifact (since re-planning can sometimes run in the background to account for changes in statistics, etc.). What do you think about the introduced feature? Can we move the state out of the plans to make re-execution cheaper (of-course, under the feature flag, as it requires to re-design some ExecutionPlan trait methods)?

@alamb
Copy link
Contributor

alamb commented Dec 31, 2025

In general I would really love to help make DataFusion planning (much) faster -- I think we have all the pieces now, but it will take some focused profiling effort to knock down the things that consume time to plan

A planning performance boost would make much more sense to me, but in this issue and PR, I am only considering the situation where the plan is already built, optimized, and ready to be reused as an artifact (since re-planning can sometimes run in the background to account for changes in statistics, etc.). What do you think about the introduced feature? Can we move the state out of the plans to make re-execution cheaper (of-course, under the feature flag, as it requires to re-design some ExecutionPlan trait methods)?

I think the idea of moving state out of the plans is a nice design in theory. However, I am concerned that the practical ability to actually migrate the codebase (and all the consumers of DataFusion) to this pattern.

It seems this approach implies that we must somehow know that the properties remain unchanged for each particular plan. That sounds much harder to me than extracting state from the plan and not calling with_new_children at all -- in other words, avoiding analysis unless it is required.

I think it could potentially be harder to implement the feature, but I think it would be much easier to migrate all downstream consumers (as they wouldn't have to do anything)

It might not be as bad as it sounds. For example, what if we added some sort of fingerprint (maybe a hash) for EquivalenceProperties that is very fast to compute. That would make it simple and straightforward to check for equality 🤔

@mbutrovich mbutrovich self-requested a review January 6, 2026 15:55
@askalt
Copy link
Contributor Author

askalt commented Jan 12, 2026

It might not be as bad as it sounds. For example, what if we added some sort of fingerprint (maybe a hash) for EquivalenceProperties that is very fast to compute. That would make it simple and straightforward to check for equality 🤔

It makes sense for me, I think, we can add a fast-path for with_new_children(...) when there are no changes in children properties.

There is another problem we should address. Unfortunately, currently plans are not reusable, also due to the dynamic filters stored in them. New versions of the filters should be created for each execution. For example, if we use the next function to re-use plans:

fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
    plan.transform_up(|plan| {
        let new_plan = Arc::clone(&plan).reset_state()?;
        Ok(Transformed::yes(new_plan))
    })
    .unwrap()
    .data
}

Imagine that there is an AggregateExec in the plan. It owns a dynamic filter that is updated when aggregation is executed. For each plan execution, a separate instance of such a filter should be created and somehow pushed down to the child operator so that it can filter its inputs accordingly.

In the suggested approach (this patch), this is solved by splitting filters into two types: planning-time filters and execution-time filters. When execute(...) is called, an independent version of the filter is created and then pushed into the children using state. Perhaps we could improve reset_state(...) to support such pushes. It seems we cannot do it without reset_state modification as the node that wants to poll filters should take its new version from somewhere (or we can try to re-push it someway).

The same situation with working table of a recursive query plan: it should someway be re-created and pushed into children for each plan re-execution.

@alamb
Copy link
Contributor

alamb commented Jan 12, 2026

Imagine that there is an AggregateExec in the plan. It owns a dynamic filter that is updated when aggregation is executed. For each plan execution, a separate instance of such a filter should be created and somehow pushed down to the child operator so that it can filter its inputs accordingly.

Yeah this is a good point

In the suggested approach (this patch), this is solved by splitting filters into two types: planning-time filters and execution-time filters. When execute(...) is called, an independent version of the filter is created and then pushed into the children using state. Perhaps we could improve reset_state(...) to support such pushes. It seems we cannot do it without reset_state modification as the node that wants to poll filters should take its new version from somewhere (or we can try to re-push it someway).

I agree we would need some way / API in the dynamic predicates to update the pointers to the new operator instance, as part of running the plan again 🤔

@alamb
Copy link
Contributor

alamb commented Jan 12, 2026

It makes sense for me, I think, we can add a fast-path for with_new_children(...) when there are no changes in children properties.

This would be cool to see -- I bet it might improve planning substantially (not just for this usecase but in general)

@askalt
Copy link
Contributor Author

askalt commented Jan 13, 2026

I agree we would need some way / API in the dynamic predicates to update the pointers to the new operator instance, as part of running the plan again 🤔

Yes, and it works the same way for WorkTable from RecursiveExec. A table should also be re-created for each execution.
The suggested approach solves all these challenges by placing such state within the state tree itself.

However, I am concerned that the practical ability to actually migrate the codebase (and all the consumers of DataFusion) to this pattern.

Currently, the customer should take the following steps (for each ExecutionPlan) to migrate to the stateless plans suggested in the patch:

  1. If the plan stores metrics, remove them and use state.get_metrics(...) within execute(...).
  2. If the plan stores state, create a separate structure MyExecState, move the state into it, implement as_any(...) (and possibly dynamic_filters(...)), and then acquire and use this state via state.get_state::<MyExecState>() in execute(...).

As an example, HashJoinExec:

https://github.com/askalt/datafusion/blob/4275e0264a61fac347530b6a393ef7114a2f8767/datafusion/physical-plan/src/joins/hash_join/exec.rs#L395-L408

Regarding the DF crates -- I can migrate them myself (and in fact, I have already done so in a separate branch).
Do you feel this is still an excessive number of changes?

@askalt
Copy link
Contributor Author

askalt commented Jan 13, 2026

It makes sense for me, I think, we can add a fast-path for with_new_children(...) when there are no changes in children properties.

This would be cool to see -- I bet it might improve planning substantially (not just for this usecase but in general)

Btw, here is a proof of concept:

#19792

Currently, there are no dynamic filters and work table support (so plan can be re-used when dynamic filter feature is off and for not recursive queries).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

datasource Changes to the datasource crate execution Related to the execution crate physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Stateless execution plans for plan caching

2 participants