Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions opteryx/models/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,28 @@

"""
The Physical Plan is a tree of nodes that represent the execution plan for a query.

Flow/Subplan Identification:
The PhysicalPlan includes logic to identify "flows" or "subplans" - chains of
operations that can be executed together without intermediate checkpoints.

A flow is a linear sequence of stateless operators. Flows break at:
- Stateful nodes (aggregates, sorts, etc.)
- Join nodes
- Branch points (multiple children)
- Merge points (multiple parents)

Each node is annotated with a flow_id. Stateful/join nodes have flow_id=None
as they act as boundaries. This enables the parallel execution engine to send
entire flows to workers for efficient execution.

Example:
Scan -> Filter -> Project -> Aggregate -> Limit

Creates 2 flows:
- Flow 0: [Scan, Filter, Project]
- Aggregate: flow_id=None (boundary)
- Flow 1: [Limit]
"""

from typing import Optional
Expand Down Expand Up @@ -89,6 +111,112 @@ def label_join_legs(self):
if not any(r == "right" for s, t, r in tester):
raise InvalidInternalStateError("Join has no RIGHT leg")

def identify_flows(self):
"""
Identify chains of operations that can be executed together as flows/subplans.

A flow is a sequence of operations that can be sent to a worker to execute
together without needing to report back interim snapshots. Flows break at:
- Stateful nodes (require accumulation across morsels)
- Join nodes (require coordination between legs)
- Branch points (nodes with multiple children)
- Merge points (nodes with multiple parents)

Stateful/join nodes are NOT included in flows - they act as boundaries.

Example:
For a plan: Scan -> Filter -> Project -> Aggregate -> Limit

This would create 2 flows:
- Flow 0: [Scan, Filter, Project] (all stateless, linear chain)
- Flow 1: [Limit] (after the stateful Aggregate boundary)
- Aggregate has flow_id=None (boundary node)

The parallel engine can send Flow 0 to a worker to execute all three
operations together, then the Aggregate runs serially, then Flow 1 can
be sent to a worker.

This method annotates each node with a flow_id to indicate which flow it belongs to.

Returns:
int: The number of flows identified in the plan.
"""
from orso.tools import random_string

# Initialize flow_id for all nodes
for nid in self.nodes():
node = self[nid]
node.flow_id = None

flow_counter = 0

# Process all nodes - we need to do this in forward direction (from entries to exits)
# to properly propagate flow IDs
entry_points = self.get_entry_points()

# Use a queue for breadth-first processing to ensure parents are processed before children
from collections import deque
queue = deque(entry_points)
visited = set()

while queue:
nid = queue.popleft()

if nid in visited:
continue

visited.add(nid)
node = self[nid]

# Stateful nodes and joins don't belong to flows - they are flow boundaries
if not node.is_stateless or node.is_join:
# Mark as visited but don't assign to a flow
# Add children to queue
for _, child, _ in self.outgoing_edges(nid):
if child not in visited:
queue.append(child)
continue

# Check if this node can start or continue a flow
incoming = self.ingoing_edges(nid)

# Determine if we should start a new flow or continue existing one
should_start_new_flow = True
parent_flow_id = None

if len(incoming) == 1:
parent_nid = incoming[0][0]
parent_node = self[parent_nid]
parent_outgoing = self.outgoing_edges(parent_nid)

# Can continue parent's flow if:
# - Parent is stateless (in a flow)
# - Parent has only one child (no branch)
# - Parent is not a join
if (parent_node.is_stateless and
not parent_node.is_join and
len(parent_outgoing) == 1 and
parent_node.flow_id is not None):
should_start_new_flow = False
parent_flow_id = parent_node.flow_id

# Start new flow or continue parent's flow
if should_start_new_flow:
current_flow_id = f"flow_{flow_counter}"
flow_counter += 1
else:
current_flow_id = parent_flow_id

# Assign this node to the flow
node.flow_id = current_flow_id

# Add children to queue
for _, child, _ in self.outgoing_edges(nid):
if child not in visited:
queue.append(child)

return flow_counter

def sensors(self):
readings = {}
for nid in self.nodes():
Expand Down
30 changes: 30 additions & 0 deletions opteryx/planner/physical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,33 @@


def create_physical_plan(logical_plan, query_properties) -> PhysicalPlan:
"""
Creates a physical execution plan from a logical plan.

The physical plan is a directed acyclic graph (DAG) of operator nodes that
represent the actual execution steps. After creating the plan, this function
identifies "flows" or "subplans" - chains of operations that can be executed
together without needing to report back interim snapshots.

Flows are identified as follows:
- A flow is a sequence of stateless operators in a linear chain
- Flows break at:
* Stateful nodes (aggregates, joins, etc.) that require accumulation
* Branch points where one node has multiple children
* Merge points where one node has multiple parents
- Stateful/join nodes act as flow boundaries and are not part of any flow

Each node in the physical plan is annotated with a flow_id to indicate which
flow it belongs to (or None if it's a boundary node). This enables the parallel
execution engine to send entire flows to workers for efficient execution.

Parameters:
logical_plan: The logical plan to convert
query_properties: Query execution properties

Returns:
PhysicalPlan: The physical execution plan with flow annotations
"""
plan = PhysicalPlan()

for nid, logical_node in logical_plan.nodes(data=True):
Expand Down Expand Up @@ -100,4 +127,7 @@ def create_physical_plan(logical_plan, query_properties) -> PhysicalPlan:
for source, destination, relation in logical_plan.edges():
plan.add_edge(source, destination, relation)

# Identify flows/subplans for parallel execution
plan.identify_flows()

return plan
Loading