diff --git a/opteryx/models/physical_plan.py b/opteryx/models/physical_plan.py index f7692fd05..513b0757c 100644 --- a/opteryx/models/physical_plan.py +++ b/opteryx/models/physical_plan.py @@ -5,6 +5,28 @@ """ The Physical Plan is a tree of nodes that represent the execution plan for a query. + +Flow/Subplan Identification: + The PhysicalPlan includes logic to identify "flows" or "subplans" - chains of + operations that can be executed together without intermediate checkpoints. + + A flow is a linear sequence of stateless operators. Flows break at: + - Stateful nodes (aggregates, sorts, etc.) + - Join nodes + - Branch points (multiple children) + - Merge points (multiple parents) + + Each node is annotated with a flow_id. Stateful/join nodes have flow_id=None + as they act as boundaries. This enables the parallel execution engine to send + entire flows to workers for efficient execution. + + Example: + Scan -> Filter -> Project -> Aggregate -> Limit + + Creates 2 flows: + - Flow 0: [Scan, Filter, Project] + - Aggregate: flow_id=None (boundary) + - Flow 1: [Limit] """ from typing import Optional @@ -89,6 +111,112 @@ def label_join_legs(self): if not any(r == "right" for s, t, r in tester): raise InvalidInternalStateError("Join has no RIGHT leg") + def identify_flows(self): + """ + Identify chains of operations that can be executed together as flows/subplans. + + A flow is a sequence of operations that can be sent to a worker to execute + together without needing to report back interim snapshots. Flows break at: + - Stateful nodes (require accumulation across morsels) + - Join nodes (require coordination between legs) + - Branch points (nodes with multiple children) + - Merge points (nodes with multiple parents) + + Stateful/join nodes are NOT included in flows - they act as boundaries. + + Example: + For a plan: Scan -> Filter -> Project -> Aggregate -> Limit + + This would create 2 flows: + - Flow 0: [Scan, Filter, Project] (all stateless, linear chain) + - Flow 1: [Limit] (after the stateful Aggregate boundary) + - Aggregate has flow_id=None (boundary node) + + The parallel engine can send Flow 0 to a worker to execute all three + operations together, then the Aggregate runs serially, then Flow 1 can + be sent to a worker. + + This method annotates each node with a flow_id to indicate which flow it belongs to. + + Returns: + int: The number of flows identified in the plan. + """ + from orso.tools import random_string + + # Initialize flow_id for all nodes + for nid in self.nodes(): + node = self[nid] + node.flow_id = None + + flow_counter = 0 + + # Process all nodes - we need to do this in forward direction (from entries to exits) + # to properly propagate flow IDs + entry_points = self.get_entry_points() + + # Use a queue for breadth-first processing to ensure parents are processed before children + from collections import deque + queue = deque(entry_points) + visited = set() + + while queue: + nid = queue.popleft() + + if nid in visited: + continue + + visited.add(nid) + node = self[nid] + + # Stateful nodes and joins don't belong to flows - they are flow boundaries + if not node.is_stateless or node.is_join: + # Mark as visited but don't assign to a flow + # Add children to queue + for _, child, _ in self.outgoing_edges(nid): + if child not in visited: + queue.append(child) + continue + + # Check if this node can start or continue a flow + incoming = self.ingoing_edges(nid) + + # Determine if we should start a new flow or continue existing one + should_start_new_flow = True + parent_flow_id = None + + if len(incoming) == 1: + parent_nid = incoming[0][0] + parent_node = self[parent_nid] + parent_outgoing = self.outgoing_edges(parent_nid) + + # Can continue parent's flow if: + # - Parent is stateless (in a flow) + # - Parent has only one child (no branch) + # - Parent is not a join + if (parent_node.is_stateless and + not parent_node.is_join and + len(parent_outgoing) == 1 and + parent_node.flow_id is not None): + should_start_new_flow = False + parent_flow_id = parent_node.flow_id + + # Start new flow or continue parent's flow + if should_start_new_flow: + current_flow_id = f"flow_{flow_counter}" + flow_counter += 1 + else: + current_flow_id = parent_flow_id + + # Assign this node to the flow + node.flow_id = current_flow_id + + # Add children to queue + for _, child, _ in self.outgoing_edges(nid): + if child not in visited: + queue.append(child) + + return flow_counter + def sensors(self): readings = {} for nid in self.nodes(): diff --git a/opteryx/planner/physical_planner.py b/opteryx/planner/physical_planner.py index 00d56d3c4..c0d5d816d 100644 --- a/opteryx/planner/physical_planner.py +++ b/opteryx/planner/physical_planner.py @@ -15,6 +15,33 @@ def create_physical_plan(logical_plan, query_properties) -> PhysicalPlan: + """ + Creates a physical execution plan from a logical plan. + + The physical plan is a directed acyclic graph (DAG) of operator nodes that + represent the actual execution steps. After creating the plan, this function + identifies "flows" or "subplans" - chains of operations that can be executed + together without needing to report back interim snapshots. + + Flows are identified as follows: + - A flow is a sequence of stateless operators in a linear chain + - Flows break at: + * Stateful nodes (aggregates, joins, etc.) that require accumulation + * Branch points where one node has multiple children + * Merge points where one node has multiple parents + - Stateful/join nodes act as flow boundaries and are not part of any flow + + Each node in the physical plan is annotated with a flow_id to indicate which + flow it belongs to (or None if it's a boundary node). This enables the parallel + execution engine to send entire flows to workers for efficient execution. + + Parameters: + logical_plan: The logical plan to convert + query_properties: Query execution properties + + Returns: + PhysicalPlan: The physical execution plan with flow annotations + """ plan = PhysicalPlan() for nid, logical_node in logical_plan.nodes(data=True): @@ -100,4 +127,7 @@ def create_physical_plan(logical_plan, query_properties) -> PhysicalPlan: for source, destination, relation in logical_plan.edges(): plan.add_edge(source, destination, relation) + # Identify flows/subplans for parallel execution + plan.identify_flows() + return plan diff --git a/tests/query_execution/test_flow_identification.py b/tests/query_execution/test_flow_identification.py new file mode 100644 index 000000000..f45d9a7f1 --- /dev/null +++ b/tests/query_execution/test_flow_identification.py @@ -0,0 +1,265 @@ +""" +Tests for flow identification in physical plans. + +Flows are chains of operations that can execute together without +needing to report back interim snapshots. +""" + +import os +import sys + +sys.path.insert(1, os.path.join(sys.path[0], "../..")) + +from opteryx.models import PhysicalPlan + + +class MockNode: + """Mock node for testing flow identification.""" + + def __init__(self, is_stateless=True, is_join=False, is_scan=False): + self.is_stateless = is_stateless + self.is_join = is_join + self.is_scan = is_scan + self.flow_id = None + + +def test_linear_chain_of_stateless_nodes(): + """Test that a linear chain of stateless nodes forms a single flow.""" + plan = PhysicalPlan() + + # Create a linear chain: scan -> filter1 -> filter2 -> project + scan = MockNode(is_stateless=True, is_scan=True) + filter1 = MockNode(is_stateless=True) + filter2 = MockNode(is_stateless=True) + project = MockNode(is_stateless=True) + + plan.add_node("scan", scan) + plan.add_node("filter1", filter1) + plan.add_node("filter2", filter2) + plan.add_node("project", project) + + plan.add_edge("scan", "filter1") + plan.add_edge("filter1", "filter2") + plan.add_edge("filter2", "project") + + # Identify flows + num_flows = plan.identify_flows() + + # All stateless nodes in a chain should be in the same flow + assert scan.flow_id == filter1.flow_id == filter2.flow_id == project.flow_id + assert num_flows >= 1 + + +def test_stateful_node_breaks_flow(): + """Test that stateful nodes break flows.""" + plan = PhysicalPlan() + + # Create chain: scan -> filter -> aggregate -> project + scan = MockNode(is_stateless=True, is_scan=True) + filter_node = MockNode(is_stateless=True) + aggregate = MockNode(is_stateless=False) # Stateful + project = MockNode(is_stateless=True) + + plan.add_node("scan", scan) + plan.add_node("filter", filter_node) + plan.add_node("aggregate", aggregate) + plan.add_node("project", project) + + plan.add_edge("scan", "filter") + plan.add_edge("filter", "aggregate") + plan.add_edge("aggregate", "project") + + # Identify flows + num_flows = plan.identify_flows() + + # scan and filter should be in one flow + assert scan.flow_id == filter_node.flow_id + + # aggregate should NOT be in a flow (it's stateful - acts as a boundary) + assert aggregate.flow_id is None + + # project should be in a different flow from scan/filter + assert project.flow_id != filter_node.flow_id + assert num_flows >= 2 + + +def test_join_breaks_flow(): + """Test that join nodes break flows.""" + plan = PhysicalPlan() + + # Create a join scenario + scan_left = MockNode(is_stateless=True, is_scan=True) + scan_right = MockNode(is_stateless=True, is_scan=True) + join = MockNode(is_stateless=False, is_join=True) + project = MockNode(is_stateless=True) + + plan.add_node("scan_left", scan_left) + plan.add_node("scan_right", scan_right) + plan.add_node("join", join) + plan.add_node("project", project) + + plan.add_edge("scan_left", "join", "left") + plan.add_edge("scan_right", "join", "right") + plan.add_edge("join", "project") + + # Identify flows + num_flows = plan.identify_flows() + + # Each scan should be in its own flow + assert scan_left.flow_id != scan_right.flow_id + + # Join should NOT be in a flow (it's a join node - acts as a boundary) + assert join.flow_id is None + + # Project should be in a different flow from both scans + assert project.flow_id != scan_left.flow_id + assert project.flow_id != scan_right.flow_id + assert num_flows >= 3 + + +def test_branch_breaks_flow(): + """Test that branching breaks flows.""" + plan = PhysicalPlan() + + # Create branching: scan -> filter -> [project1, project2] + scan = MockNode(is_stateless=True, is_scan=True) + filter_node = MockNode(is_stateless=True) + project1 = MockNode(is_stateless=True) + project2 = MockNode(is_stateless=True) + + plan.add_node("scan", scan) + plan.add_node("filter", filter_node) + plan.add_node("project1", project1) + plan.add_node("project2", project2) + + plan.add_edge("scan", "filter") + plan.add_edge("filter", "project1") + plan.add_edge("filter", "project2") + + # Identify flows + num_flows = plan.identify_flows() + + # scan and filter should be in the same flow + assert scan.flow_id == filter_node.flow_id + + # Each branch should start a new flow (filter has multiple children) + assert project1.flow_id != filter_node.flow_id + assert project2.flow_id != filter_node.flow_id + # The two branches should be in different flows + assert project1.flow_id != project2.flow_id + + +def test_merge_breaks_flow(): + """Test that merge points break flows.""" + plan = PhysicalPlan() + + # Create merge: [scan1, scan2] -> union -> project + scan1 = MockNode(is_stateless=True, is_scan=True) + scan2 = MockNode(is_stateless=True, is_scan=True) + union = MockNode(is_stateless=False) # Stateful union + project = MockNode(is_stateless=True) + + plan.add_node("scan1", scan1) + plan.add_node("scan2", scan2) + plan.add_node("union", union) + plan.add_node("project", project) + + plan.add_edge("scan1", "union") + plan.add_edge("scan2", "union") + plan.add_edge("union", "project") + + # Identify flows + num_flows = plan.identify_flows() + + # Each scan should be in its own flow + assert scan1.flow_id != scan2.flow_id + + # Union should NOT be in a flow (it's stateful - acts as a boundary) + assert union.flow_id is None + + # Project should be in its own flow + assert project.flow_id != scan1.flow_id + assert project.flow_id != scan2.flow_id + + +def test_complex_query_flow(): + """Test flow identification on a more complex query structure.""" + plan = PhysicalPlan() + + # Create a complex plan representing: + # SELECT ... FROM table1 JOIN table2 WHERE ... GROUP BY ... ORDER BY ... LIMIT + # + # Scan1 -> Filter1 \ + # Join -> Project -> Aggregate -> Sort -> Limit + # Scan2 -> Filter2 / + + scan1 = MockNode(is_stateless=True, is_scan=True) + filter1 = MockNode(is_stateless=True) + scan2 = MockNode(is_stateless=True, is_scan=True) + filter2 = MockNode(is_stateless=True) + join = MockNode(is_stateless=False, is_join=True) + project = MockNode(is_stateless=True) + aggregate = MockNode(is_stateless=False) + sort = MockNode(is_stateless=False) + limit = MockNode(is_stateless=True) + + plan.add_node("scan1", scan1) + plan.add_node("filter1", filter1) + plan.add_node("scan2", scan2) + plan.add_node("filter2", filter2) + plan.add_node("join", join) + plan.add_node("project", project) + plan.add_node("aggregate", aggregate) + plan.add_node("sort", sort) + plan.add_node("limit", limit) + + # Left side of join + plan.add_edge("scan1", "filter1") + plan.add_edge("filter1", "join", "left") + + # Right side of join + plan.add_edge("scan2", "filter2") + plan.add_edge("filter2", "join", "right") + + # After join + plan.add_edge("join", "project") + plan.add_edge("project", "aggregate") + plan.add_edge("aggregate", "sort") + plan.add_edge("sort", "limit") + + # Identify flows + num_flows = plan.identify_flows() + + # Flow 0: scan1 -> filter1 (left side of join) + assert scan1.flow_id == filter1.flow_id + + # Flow 1: scan2 -> filter2 (right side of join) + assert scan2.flow_id == filter2.flow_id + + # Flows should be different for left and right + assert scan1.flow_id != scan2.flow_id + + # Join is a boundary + assert join.flow_id is None + + # Flow 2: project (after join, before aggregate) + assert project.flow_id is not None + assert project.flow_id != scan1.flow_id + assert project.flow_id != scan2.flow_id + + # Aggregate and Sort are boundaries + assert aggregate.flow_id is None + assert sort.flow_id is None + + # Flow 3: limit (after sort) + assert limit.flow_id is not None + assert limit.flow_id != project.flow_id + + # Should have at least 4 flows (left, right, project, limit) + assert num_flows >= 4 + + +if __name__ == "__main__": # pragma: no cover + from tests.tools import run_tests + + run_tests()