From 43af0471e35ce34c11023de64833164b218fde9f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 4 Oct 2025 18:56:31 +0000 Subject: [PATCH 1/4] Initial plan From 3c696e8032f45762c81d34a4efd7c7fcf6f7825e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 4 Oct 2025 19:06:01 +0000 Subject: [PATCH 2/4] Add flow/subplan identification to physical planner Co-authored-by: joocer <1688479+joocer@users.noreply.github.com> --- opteryx/models/physical_plan.py | 72 +++++++ opteryx/planner/physical_planner.py | 3 + .../test_flow_identification.py | 188 ++++++++++++++++++ 3 files changed, 263 insertions(+) create mode 100644 tests/query_execution/test_flow_identification.py diff --git a/opteryx/models/physical_plan.py b/opteryx/models/physical_plan.py index f7692fd05..c769a6eec 100644 --- a/opteryx/models/physical_plan.py +++ b/opteryx/models/physical_plan.py @@ -89,6 +89,78 @@ def label_join_legs(self): if not any(r == "right" for s, t, r in tester): raise InvalidInternalStateError("Join has no RIGHT leg") + def identify_flows(self): + """ + Identify chains of operations that can be executed together as flows/subplans. + + A flow is a sequence of operations that can be sent to a worker to execute + together without needing to report back interim snapshots. Flows break at: + - Stateful nodes (require accumulation across morsels) + - Join nodes (require coordination between legs) + - Branch points (nodes with multiple children) + - Merge points (nodes with multiple parents) + + Stateful/join nodes are NOT included in flows - they act as boundaries. + + This method annotates each node with a flow_id to indicate which flow it belongs to. + """ + from orso.tools import random_string + + # Initialize flow_id for all nodes + for nid in self.nodes(): + node = self[nid] + node.flow_id = None + + visited = set() + flow_counter = 0 + + # Process nodes in depth-first order + for nid, node in self.depth_first_search_flat(): + if nid in visited: + continue + + visited.add(nid) + + # Stateful nodes and joins don't belong to flows - they are flow boundaries + if not node.is_stateless or node.is_join: + # Mark as visited but don't assign to a flow + continue + + # Check if this node can start or continue a flow + incoming = self.ingoing_edges(nid) + + # Determine if we should start a new flow or continue existing one + should_start_new_flow = True + parent_flow_id = None + + if len(incoming) == 1: + parent_nid = incoming[0][0] + parent_node = self[parent_nid] + parent_outgoing = self.outgoing_edges(parent_nid) + + # Can continue parent's flow if: + # - Parent is stateless (in a flow) + # - Parent has only one child (no branch) + # - Parent is not a join + if (parent_node.is_stateless and + not parent_node.is_join and + len(parent_outgoing) == 1 and + parent_node.flow_id is not None): + should_start_new_flow = False + parent_flow_id = parent_node.flow_id + + # Start new flow or continue parent's flow + if should_start_new_flow: + current_flow_id = f"flow_{flow_counter}" + flow_counter += 1 + else: + current_flow_id = parent_flow_id + + # Assign this node to the flow + node.flow_id = current_flow_id + + return flow_counter + def sensors(self): readings = {} for nid in self.nodes(): diff --git a/opteryx/planner/physical_planner.py b/opteryx/planner/physical_planner.py index 00d56d3c4..4326b857d 100644 --- a/opteryx/planner/physical_planner.py +++ b/opteryx/planner/physical_planner.py @@ -100,4 +100,7 @@ def create_physical_plan(logical_plan, query_properties) -> PhysicalPlan: for source, destination, relation in logical_plan.edges(): plan.add_edge(source, destination, relation) + # Identify flows/subplans for parallel execution + plan.identify_flows() + return plan diff --git a/tests/query_execution/test_flow_identification.py b/tests/query_execution/test_flow_identification.py new file mode 100644 index 000000000..d6280648e --- /dev/null +++ b/tests/query_execution/test_flow_identification.py @@ -0,0 +1,188 @@ +""" +Tests for flow identification in physical plans. + +Flows are chains of operations that can execute together without +needing to report back interim snapshots. +""" + +import os +import sys + +sys.path.insert(1, os.path.join(sys.path[0], "../..")) + +from opteryx.models import PhysicalPlan + + +class MockNode: + """Mock node for testing flow identification.""" + + def __init__(self, is_stateless=True, is_join=False, is_scan=False): + self.is_stateless = is_stateless + self.is_join = is_join + self.is_scan = is_scan + self.flow_id = None + + +def test_linear_chain_of_stateless_nodes(): + """Test that a linear chain of stateless nodes forms a single flow.""" + plan = PhysicalPlan() + + # Create a linear chain: scan -> filter1 -> filter2 -> project + scan = MockNode(is_stateless=True, is_scan=True) + filter1 = MockNode(is_stateless=True) + filter2 = MockNode(is_stateless=True) + project = MockNode(is_stateless=True) + + plan.add_node("scan", scan) + plan.add_node("filter1", filter1) + plan.add_node("filter2", filter2) + plan.add_node("project", project) + + plan.add_edge("scan", "filter1") + plan.add_edge("filter1", "filter2") + plan.add_edge("filter2", "project") + + # Identify flows + num_flows = plan.identify_flows() + + # All stateless nodes in a chain should be in the same flow + assert scan.flow_id == filter1.flow_id == filter2.flow_id == project.flow_id + assert num_flows >= 1 + + +def test_stateful_node_breaks_flow(): + """Test that stateful nodes break flows.""" + plan = PhysicalPlan() + + # Create chain: scan -> filter -> aggregate -> project + scan = MockNode(is_stateless=True, is_scan=True) + filter_node = MockNode(is_stateless=True) + aggregate = MockNode(is_stateless=False) # Stateful + project = MockNode(is_stateless=True) + + plan.add_node("scan", scan) + plan.add_node("filter", filter_node) + plan.add_node("aggregate", aggregate) + plan.add_node("project", project) + + plan.add_edge("scan", "filter") + plan.add_edge("filter", "aggregate") + plan.add_edge("aggregate", "project") + + # Identify flows + num_flows = plan.identify_flows() + + # scan and filter should be in one flow + assert scan.flow_id == filter_node.flow_id + + # aggregate should NOT be in a flow (it's stateful - acts as a boundary) + assert aggregate.flow_id is None + + # project should be in a different flow from scan/filter + assert project.flow_id != filter_node.flow_id + assert num_flows >= 2 + + +def test_join_breaks_flow(): + """Test that join nodes break flows.""" + plan = PhysicalPlan() + + # Create a join scenario + scan_left = MockNode(is_stateless=True, is_scan=True) + scan_right = MockNode(is_stateless=True, is_scan=True) + join = MockNode(is_stateless=False, is_join=True) + project = MockNode(is_stateless=True) + + plan.add_node("scan_left", scan_left) + plan.add_node("scan_right", scan_right) + plan.add_node("join", join) + plan.add_node("project", project) + + plan.add_edge("scan_left", "join", "left") + plan.add_edge("scan_right", "join", "right") + plan.add_edge("join", "project") + + # Identify flows + num_flows = plan.identify_flows() + + # Each scan should be in its own flow + assert scan_left.flow_id != scan_right.flow_id + + # Join should NOT be in a flow (it's a join node - acts as a boundary) + assert join.flow_id is None + + # Project should be in a different flow from both scans + assert project.flow_id != scan_left.flow_id + assert project.flow_id != scan_right.flow_id + assert num_flows >= 3 + + +def test_branch_breaks_flow(): + """Test that branching breaks flows.""" + plan = PhysicalPlan() + + # Create branching: scan -> filter -> [project1, project2] + scan = MockNode(is_stateless=True, is_scan=True) + filter_node = MockNode(is_stateless=True) + project1 = MockNode(is_stateless=True) + project2 = MockNode(is_stateless=True) + + plan.add_node("scan", scan) + plan.add_node("filter", filter_node) + plan.add_node("project1", project1) + plan.add_node("project2", project2) + + plan.add_edge("scan", "filter") + plan.add_edge("filter", "project1") + plan.add_edge("filter", "project2") + + # Identify flows + num_flows = plan.identify_flows() + + # scan and filter should be in the same flow + assert scan.flow_id == filter_node.flow_id + + # Each branch should start a new flow (filter has multiple children) + assert project1.flow_id != filter_node.flow_id + assert project2.flow_id != filter_node.flow_id + # The two branches should be in different flows + assert project1.flow_id != project2.flow_id + + +def test_merge_breaks_flow(): + """Test that merge points break flows.""" + plan = PhysicalPlan() + + # Create merge: [scan1, scan2] -> union -> project + scan1 = MockNode(is_stateless=True, is_scan=True) + scan2 = MockNode(is_stateless=True, is_scan=True) + union = MockNode(is_stateless=False) # Stateful union + project = MockNode(is_stateless=True) + + plan.add_node("scan1", scan1) + plan.add_node("scan2", scan2) + plan.add_node("union", union) + plan.add_node("project", project) + + plan.add_edge("scan1", "union") + plan.add_edge("scan2", "union") + plan.add_edge("union", "project") + + # Identify flows + num_flows = plan.identify_flows() + + # Each scan should be in its own flow + assert scan1.flow_id != scan2.flow_id + + # Union should NOT be in a flow (it's stateful - acts as a boundary) + assert union.flow_id is None + + # Project should be in its own flow + assert project.flow_id != scan1.flow_id + assert project.flow_id != scan2.flow_id + + +if __name__ == "__main__": # pragma: no cover + from tests.tools import run_tests + + run_tests() From e195a23e99a8324d0a228a12d819f1bd99b67082 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 4 Oct 2025 19:08:10 +0000 Subject: [PATCH 3/4] Add comprehensive documentation for flow identification Co-authored-by: joocer <1688479+joocer@users.noreply.github.com> --- opteryx/models/physical_plan.py | 37 +++++++++++++++++++++++++++++ opteryx/planner/physical_planner.py | 27 +++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/opteryx/models/physical_plan.py b/opteryx/models/physical_plan.py index c769a6eec..5b347c45e 100644 --- a/opteryx/models/physical_plan.py +++ b/opteryx/models/physical_plan.py @@ -5,6 +5,28 @@ """ The Physical Plan is a tree of nodes that represent the execution plan for a query. + +Flow/Subplan Identification: + The PhysicalPlan includes logic to identify "flows" or "subplans" - chains of + operations that can be executed together without intermediate checkpoints. + + A flow is a linear sequence of stateless operators. Flows break at: + - Stateful nodes (aggregates, sorts, etc.) + - Join nodes + - Branch points (multiple children) + - Merge points (multiple parents) + + Each node is annotated with a flow_id. Stateful/join nodes have flow_id=None + as they act as boundaries. This enables the parallel execution engine to send + entire flows to workers for efficient execution. + + Example: + Scan -> Filter -> Project -> Aggregate -> Limit + + Creates 2 flows: + - Flow 0: [Scan, Filter, Project] + - Aggregate: flow_id=None (boundary) + - Flow 1: [Limit] """ from typing import Optional @@ -102,7 +124,22 @@ def identify_flows(self): Stateful/join nodes are NOT included in flows - they act as boundaries. + Example: + For a plan: Scan -> Filter -> Project -> Aggregate -> Limit + + This would create 2 flows: + - Flow 0: [Scan, Filter, Project] (all stateless, linear chain) + - Flow 1: [Limit] (after the stateful Aggregate boundary) + - Aggregate has flow_id=None (boundary node) + + The parallel engine can send Flow 0 to a worker to execute all three + operations together, then the Aggregate runs serially, then Flow 1 can + be sent to a worker. + This method annotates each node with a flow_id to indicate which flow it belongs to. + + Returns: + int: The number of flows identified in the plan. """ from orso.tools import random_string diff --git a/opteryx/planner/physical_planner.py b/opteryx/planner/physical_planner.py index 4326b857d..c0d5d816d 100644 --- a/opteryx/planner/physical_planner.py +++ b/opteryx/planner/physical_planner.py @@ -15,6 +15,33 @@ def create_physical_plan(logical_plan, query_properties) -> PhysicalPlan: + """ + Creates a physical execution plan from a logical plan. + + The physical plan is a directed acyclic graph (DAG) of operator nodes that + represent the actual execution steps. After creating the plan, this function + identifies "flows" or "subplans" - chains of operations that can be executed + together without needing to report back interim snapshots. + + Flows are identified as follows: + - A flow is a sequence of stateless operators in a linear chain + - Flows break at: + * Stateful nodes (aggregates, joins, etc.) that require accumulation + * Branch points where one node has multiple children + * Merge points where one node has multiple parents + - Stateful/join nodes act as flow boundaries and are not part of any flow + + Each node in the physical plan is annotated with a flow_id to indicate which + flow it belongs to (or None if it's a boundary node). This enables the parallel + execution engine to send entire flows to workers for efficient execution. + + Parameters: + logical_plan: The logical plan to convert + query_properties: Query execution properties + + Returns: + PhysicalPlan: The physical execution plan with flow annotations + """ plan = PhysicalPlan() for nid, logical_node in logical_plan.nodes(data=True): From 13a96c8621f2668dd4314fa5c4585f62343a63d5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 4 Oct 2025 19:12:17 +0000 Subject: [PATCH 4/4] Fix flow identification to use BFS for correct parent-child ordering Co-authored-by: joocer <1688479+joocer@users.noreply.github.com> --- opteryx/models/physical_plan.py | 25 +++++- .../test_flow_identification.py | 77 +++++++++++++++++++ 2 files changed, 99 insertions(+), 3 deletions(-) diff --git a/opteryx/models/physical_plan.py b/opteryx/models/physical_plan.py index 5b347c45e..513b0757c 100644 --- a/opteryx/models/physical_plan.py +++ b/opteryx/models/physical_plan.py @@ -148,19 +148,33 @@ def identify_flows(self): node = self[nid] node.flow_id = None - visited = set() flow_counter = 0 - # Process nodes in depth-first order - for nid, node in self.depth_first_search_flat(): + # Process all nodes - we need to do this in forward direction (from entries to exits) + # to properly propagate flow IDs + entry_points = self.get_entry_points() + + # Use a queue for breadth-first processing to ensure parents are processed before children + from collections import deque + queue = deque(entry_points) + visited = set() + + while queue: + nid = queue.popleft() + if nid in visited: continue visited.add(nid) + node = self[nid] # Stateful nodes and joins don't belong to flows - they are flow boundaries if not node.is_stateless or node.is_join: # Mark as visited but don't assign to a flow + # Add children to queue + for _, child, _ in self.outgoing_edges(nid): + if child not in visited: + queue.append(child) continue # Check if this node can start or continue a flow @@ -195,6 +209,11 @@ def identify_flows(self): # Assign this node to the flow node.flow_id = current_flow_id + + # Add children to queue + for _, child, _ in self.outgoing_edges(nid): + if child not in visited: + queue.append(child) return flow_counter diff --git a/tests/query_execution/test_flow_identification.py b/tests/query_execution/test_flow_identification.py index d6280648e..f45d9a7f1 100644 --- a/tests/query_execution/test_flow_identification.py +++ b/tests/query_execution/test_flow_identification.py @@ -182,6 +182,83 @@ def test_merge_breaks_flow(): assert project.flow_id != scan2.flow_id +def test_complex_query_flow(): + """Test flow identification on a more complex query structure.""" + plan = PhysicalPlan() + + # Create a complex plan representing: + # SELECT ... FROM table1 JOIN table2 WHERE ... GROUP BY ... ORDER BY ... LIMIT + # + # Scan1 -> Filter1 \ + # Join -> Project -> Aggregate -> Sort -> Limit + # Scan2 -> Filter2 / + + scan1 = MockNode(is_stateless=True, is_scan=True) + filter1 = MockNode(is_stateless=True) + scan2 = MockNode(is_stateless=True, is_scan=True) + filter2 = MockNode(is_stateless=True) + join = MockNode(is_stateless=False, is_join=True) + project = MockNode(is_stateless=True) + aggregate = MockNode(is_stateless=False) + sort = MockNode(is_stateless=False) + limit = MockNode(is_stateless=True) + + plan.add_node("scan1", scan1) + plan.add_node("filter1", filter1) + plan.add_node("scan2", scan2) + plan.add_node("filter2", filter2) + plan.add_node("join", join) + plan.add_node("project", project) + plan.add_node("aggregate", aggregate) + plan.add_node("sort", sort) + plan.add_node("limit", limit) + + # Left side of join + plan.add_edge("scan1", "filter1") + plan.add_edge("filter1", "join", "left") + + # Right side of join + plan.add_edge("scan2", "filter2") + plan.add_edge("filter2", "join", "right") + + # After join + plan.add_edge("join", "project") + plan.add_edge("project", "aggregate") + plan.add_edge("aggregate", "sort") + plan.add_edge("sort", "limit") + + # Identify flows + num_flows = plan.identify_flows() + + # Flow 0: scan1 -> filter1 (left side of join) + assert scan1.flow_id == filter1.flow_id + + # Flow 1: scan2 -> filter2 (right side of join) + assert scan2.flow_id == filter2.flow_id + + # Flows should be different for left and right + assert scan1.flow_id != scan2.flow_id + + # Join is a boundary + assert join.flow_id is None + + # Flow 2: project (after join, before aggregate) + assert project.flow_id is not None + assert project.flow_id != scan1.flow_id + assert project.flow_id != scan2.flow_id + + # Aggregate and Sort are boundaries + assert aggregate.flow_id is None + assert sort.flow_id is None + + # Flow 3: limit (after sort) + assert limit.flow_id is not None + assert limit.flow_id != project.flow_id + + # Should have at least 4 flows (left, right, project, limit) + assert num_flows >= 4 + + if __name__ == "__main__": # pragma: no cover from tests.tools import run_tests