fix: workflow incorrectly marked as completed while nodes are still executing#138
Conversation
Code Review by Qodo
1. print() in propagate_skip_from_edge
|
| if edge_states["has_taken"]: | ||
| # Enqueue node | ||
| # Start execution and enqueue node | ||
| print(f"Starting execution for node: {downstream_node_id}") |
There was a problem hiding this comment.
1. print() in propagate_skip_from_edge 📘 Rule violation ✧ Quality
• The backend module introduces a print() call, which bypasses the standard logging system and reduces production observability. • This violates the requirement to use a module-level logger instead of print(), making output harder to control, route, and structure consistently.
Agent prompt
## Issue description
Non-test backend code uses `print()` instead of the standard logging system.
## Issue Context
Compliance requires a module-level logger (e.g., `logger = logging.getLogger(__name__)`) and forbids `print()` in production backend modules.
## Fix Focus Areas
- api/core/workflow/graph_engine/graph_traversal/skip_propagator.py[5-65]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| def get_incoming_edges_side_effect(node_id): | ||
| if node_id == "node_2": | ||
| return [edge1] | ||
| elif node_id == "node_4": | ||
| return [edge3] | ||
| return [] | ||
|
|
||
| mock_graph.get_incoming_edges.side_effect = get_incoming_edges_side_effect | ||
|
|
||
| # Setup get_outgoing_edges to return different values based on node | ||
| def get_outgoing_edges_side_effect(node_id): | ||
| if node_id == "node_2": | ||
| return [edge3] | ||
| elif node_id == "node_4": | ||
| return [] # No outgoing edges, stops recursion | ||
| return [] |
There was a problem hiding this comment.
2. Untyped side-effect helper functions 📘 Rule violation ✓ Correctness
• The new test file defines local helper functions (get_incoming_edges_side_effect, get_outgoing_edges_side_effect) without parameter and return type annotations. • This violates the requirement that Python function definitions include type hints for all parameters and an explicit return type, reducing type safety and maintainability.
Agent prompt
## Issue description
Local helper functions in the new test file are missing parameter and return type annotations.
## Issue Context
Compliance requires type hints for all function parameters and return values (Python 3.12+ typing style).
## Fix Focus Areas
- api/tests/unit_tests/core/workflow/graph_engine/graph_traversal/test_skip_propagator.py[204-221]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| for edge in outgoing_edges: | ||
| self._state_manager.mark_edge_skipped(edge.id) | ||
| # Recursively propagate skip | ||
| self.propagate_skip_from_edge(edge.id) | ||
| self._state_manager.mark_edge_skipped(edge.id) |
There was a problem hiding this comment.
3. Skip propagation order bug 🐞 Bug ✓ Correctness
• _propagate_skip_to_node() now calls propagate_skip_from_edge() before marking the outgoing edge as SKIPPED, but propagate_skip_from_edge() is documented/implemented to operate “from a skipped edge”. • Because edge states default to UNKNOWN and propagate_skip_from_edge() returns early when any incoming edge is UNKNOWN, this reordering can prevent skip propagation from reaching downstream nodes, leaving them stuck in UNKNOWN.
Agent prompt
### Issue description
`_propagate_skip_to_node()` currently calls `propagate_skip_from_edge()` before marking the edge as skipped. But `propagate_skip_from_edge()` assumes it is called *from a skipped edge* and will return early if any incoming edge is `UNKNOWN`. Because edges default to `UNKNOWN`, this can stop skip propagation prematurely.
### Issue Context
The same module (`skip_branch_paths`) marks the edge skipped before propagating, indicating intended behavior.
### Fix Focus Areas
- api/core/workflow/graph_engine/graph_traversal/skip_propagator.py[81-87]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| self._state_manager.start_execution(downstream_node_id) | ||
| self._state_manager.enqueue_node(downstream_node_id) |
There was a problem hiding this comment.
4. Start before enqueue 🐞 Bug ⛯ Reliability
• SkipPropagator now calls start_execution() before enqueue_node(), unlike other engine call sites that enqueue first. • If enqueue_node() raises (e.g., missing node id), the executing set will already be mutated, which can keep is_execution_complete() false and leave the system in a partially-updated state.
Agent prompt
### Issue description
`SkipPropagator.propagate_skip_from_edge()` calls `start_execution()` before `enqueue_node()`, unlike other call sites. This can leave partially-updated execution tracking if `enqueue_node()` fails.
### Issue Context
Other engine call sites (event handlers, graph engine startup) enqueue first, then start tracking.
### Fix Focus Areas
- api/core/workflow/graph_engine/graph_traversal/skip_propagator.py[59-65]
- api/core/workflow/graph_engine/graph_state_manager.py[42-55]
- api/core/workflow/graph_engine/event_management/event_handlers.py[194-198]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| def test_propagate_skip_to_node_marks_node_and_outgoing_edges_skipped(self) -> None: | ||
| """_propagate_skip_to_node should mark node and all outgoing edges as skipped.""" | ||
| # Arrange | ||
| mock_graph = create_autospec(Graph) | ||
| mock_state_manager = create_autospec(GraphStateManager) | ||
|
|
||
| # Create outgoing edges | ||
| edge1 = MagicMock(spec=Edge) | ||
| edge1.id = "edge_2" | ||
| edge1.head = "node_downstream_1" # Set head for propagate_skip_from_edge | ||
|
|
||
| edge2 = MagicMock(spec=Edge) | ||
| edge2.id = "edge_3" | ||
| edge2.head = "node_downstream_2" | ||
|
|
||
| # Setup graph edges dict for propagate_skip_from_edge | ||
| mock_graph.edges = {"edge_2": edge1, "edge_3": edge2} | ||
| mock_graph.get_outgoing_edges.return_value = [edge1, edge2] | ||
|
|
||
| # Setup get_incoming_edges to return empty list to stop recursion | ||
| mock_graph.get_incoming_edges.return_value = [] | ||
|
|
||
| propagator = SkipPropagator(mock_graph, mock_state_manager) | ||
|
|
||
| # Use mock to call private method | ||
| # Act | ||
| propagator._propagate_skip_to_node("node_1") | ||
|
|
||
| # Assert | ||
| mock_state_manager.mark_node_skipped.assert_called_once_with("node_1") | ||
| mock_state_manager.mark_edge_skipped.assert_any_call("edge_2") | ||
| mock_state_manager.mark_edge_skipped.assert_any_call("edge_3") | ||
| assert mock_state_manager.mark_edge_skipped.call_count == 2 | ||
| # Should recursively propagate from each edge | ||
| # Since propagate_skip_from_edge is called, we need to verify it was called | ||
| # But we can't directly verify due to recursion. We'll trust the logic. |
There was a problem hiding this comment.
5. Tests don’t verify recursion 🐞 Bug ⛯ Reliability
• Some newly-added tests don’t assert that recursive skip propagation actually occurs; they explicitly state they “trust the logic”. • Several tests rely heavily on mocks and do not set up concrete edge states, reducing the chance they catch regressions in skip propagation behavior.
Agent prompt
### Issue description
Some tests do not verify recursive behavior and rely on mocks without concrete edge state setup, which can mask regressions.
### Issue Context
`propagate_skip_from_edge()` has early-return branches based on `edge_states["has_unknown"]` and other flags; tests should exercise these branches with real edge state values.
### Fix Focus Areas
- api/tests/unit_tests/core/workflow/graph_engine/graph_traversal/test_skip_propagator.py[115-185]
- api/core/workflow/graph_engine/graph_traversal/skip_propagator.py[45-97]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
Benchmark PR from qodo-benchmark#441