Add flow/subplan identification to physical planner for parallel execution #2815
+423
−0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.



Overview
This PR implements automatic flow/subplan identification in the physical planner to enable more efficient parallel execution. The planner now identifies chains of operations that can be executed together on a worker without needing to report back interim snapshots.
Problem
The parallel execution engine needs to understand which operations can be grouped together and executed as a unit. Previously, the physical plan was just a DAG of operators without any grouping information, making it difficult to optimize parallel execution and minimize communication overhead between workers.
Solution
Added
identify_flows()method toPhysicalPlanthat automatically annotates each operator node with aflow_idindicating which flow it belongs to. Flows are linear sequences of stateless operators that can execute together. The algorithm breaks flows at natural boundaries:Boundary nodes are marked with
flow_id=Noneto indicate they require special handling.Example
For a query with joins and aggregations:
The physical plan creates the following flows:
This allows the parallel engine to:
Implementation Details
Algorithm:
Integration:
create_physical_plan()after the plan is constructedflow_idattribute that existing code can safely ignoreBenefits
Testing
Added comprehensive test suite covering:
All tests verify correct flow identification and proper handling of boundary nodes.
Files Changed
opteryx/models/physical_plan.py- Addedidentify_flows()method with comprehensive documentationopteryx/planner/physical_planner.py- Integrated flow identification into plan creationtests/query_execution/test_flow_identification.py- Full test coverage for all scenariosNext Steps
The parallel execution engine can now leverage flow information to:
Original prompt
✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.