fix: workflow incorrectly marked as completed while nodes are still executing#43
Conversation
Greptile OverviewGreptile SummaryOverviewThis PR attempts to fix a race condition where workflows are incorrectly marked as completed while nodes are still executing. The fix adds a Changes Made
Root Cause of the BugThe workflow completion check ( A race condition existed where:
Critical Issues Found1. Order Inconsistency Across Codebase (Logic Bug)The new code in
This inconsistency creates maintenance burden and potential for subtle bugs. If the new order is correct for race condition prevention, all call sites should be updated. 2. Duplicate Enqueueing Bug (Critical Logic Bug)Nodes with multiple incoming edges can be enqueued multiple times. When multiple edges become TAKEN, each triggers enqueueing without checking if the node is already queued/executing: Scenario: Both Fix needed: Add state check in if self._graph.nodes[node_id].state != NodeState.TAKEN:
self._ready_queue.put(node_id)
self._graph.nodes[node_id].state = NodeState.TAKEN3. Operation Order Change in enqueue_node() (Logic Bug)Swapping 4. Test Gaps (Style Issue)Tests verify that both RecommendationDo not merge as-is. While the PR addresses a real race condition, it introduces multiple critical bugs and inconsistencies that could cause nodes to execute multiple times or create subtle timing issues. Confidence Score: 1/5
Important Files ChangedFile Analysis
Sequence DiagramsequenceDiagram
participant D as Dispatcher
participant EH as EventHandler
participant EP as EdgeProcessor
participant SP as SkipPropagator
participant SM as StateManager
participant RQ as ReadyQueue
participant W as Worker
Note over D,W: Node Completion Flow (Fixed)
W->>EH: NodeRunSucceededEvent
EH->>EP: process_node_success(node_id)
EP->>SM: mark_edge_taken(edge_id)
EP->>SM: is_node_ready(downstream_node)
SM-->>EP: true (edge is TAKEN)
EP-->>EH: [downstream_node]
Note over EH,RQ: Critical Race Condition Window
EH->>SM: enqueue_node(downstream_node)
SM->>RQ: put(downstream_node)
SM->>SM: set state=TAKEN
EH->>SM: start_execution(downstream_node)
SM->>SM: add to executing_nodes
Note over D,RQ: Dispatcher checks completion
D->>RQ: empty()?
RQ-->>D: false (node in queue)
D->>SM: get_executing_count()
SM-->>D: 1 (node marked executing)
Note over D: execution_complete = false ✓
EH->>SM: finish_execution(completed_node)
Note over D,W: Skip Propagation Flow (New Code)
EP->>SP: skip_branch_paths(unselected_edges)
SP->>SM: mark_edge_skipped(edge_id)
SP->>SP: propagate_skip_from_edge(edge_id)
SP->>SM: analyze_edge_states(incoming_edges)
SM-->>SP: {has_taken: true, ...}
Note over SP,RQ: NEW: start_execution called BEFORE enqueue
SP->>SM: start_execution(downstream_node)
SM->>SM: add to executing_nodes
SP->>SM: enqueue_node(downstream_node)
SM->>RQ: put(downstream_node)
SM->>SM: set state=TAKEN
Note over SP,SM: Potential Issue: Duplicate Enqueueing
Note over SP: If multiple edges processed concurrently,<br/>node can be enqueued multiple times<br/>(no idempotency check)
|
| self._state_manager.start_execution(downstream_node_id) | ||
| self._state_manager.enqueue_node(downstream_node_id) |
There was a problem hiding this comment.
The order of start_execution() before enqueue_node() is inconsistent with the rest of the codebase. Throughout the system, these calls follow the pattern:
enqueue_node(node_id)firststart_execution(node_id)second
Evidence from existing code:
event_handlers.py:196-197:enqueue_node→start_executionevent_handlers.py:279-280:enqueue_node→start_executionevent_handlers.py:309-310:enqueue_node→start_executiongraph_engine.py:334-335:enqueue_node→start_executiongraph_engine.py:338-339:enqueue_node→start_execution
Only this location uses the reverse order, creating an inconsistency that could lead to subtle bugs or make the code harder to maintain. While calling start_execution() first may prevent a specific race condition (ensuring executing_nodes is incremented before the queue becomes non-empty), this should be done consistently across the entire codebase.
| self._state_manager.start_execution(downstream_node_id) | |
| self._state_manager.enqueue_node(downstream_node_id) | |
| # Enqueue node | |
| self._state_manager.enqueue_node(downstream_node_id) | |
| self._state_manager.start_execution(downstream_node_id) |
If the reverse order is intentionally correct here (and it likely is for fixing the race condition), then ALL other call sites should be updated to match this pattern for consistency.
Prompt To Fix With AI
This is a comment left during a code review.
Path: api/core/workflow/graph_engine/graph_traversal/skip_propagator.py
Line: 62:63
Comment:
The order of `start_execution()` before `enqueue_node()` is inconsistent with the rest of the codebase. Throughout the system, these calls follow the pattern:
1. `enqueue_node(node_id)` first
2. `start_execution(node_id)` second
**Evidence from existing code:**
- `event_handlers.py:196-197`: `enqueue_node` → `start_execution`
- `event_handlers.py:279-280`: `enqueue_node` → `start_execution`
- `event_handlers.py:309-310`: `enqueue_node` → `start_execution`
- `graph_engine.py:334-335`: `enqueue_node` → `start_execution`
- `graph_engine.py:338-339`: `enqueue_node` → `start_execution`
**Only this location uses the reverse order**, creating an inconsistency that could lead to subtle bugs or make the code harder to maintain. While calling `start_execution()` first may prevent a specific race condition (ensuring `executing_nodes` is incremented before the queue becomes non-empty), this should be done consistently across the entire codebase.
```suggestion
# Enqueue node
self._state_manager.enqueue_node(downstream_node_id)
self._state_manager.start_execution(downstream_node_id)
```
**If the reverse order is intentionally correct here** (and it likely is for fixing the race condition), then ALL other call sites should be updated to match this pattern for consistency.
How can I resolve this? If you propose a fix, please make it concise.| with self._lock: | ||
| self._graph.nodes[node_id].state = NodeState.TAKEN | ||
| self._ready_queue.put(node_id) | ||
| self._graph.nodes[node_id].state = NodeState.TAKEN |
There was a problem hiding this comment.
The order of operations inside enqueue_node() was changed - put() is now called before setting the node state to TAKEN. While both operations are protected by self._lock, this change has implications:
Potential Issue: If ready_queue.put() raises an exception (e.g., if the queue implementation has capacity limits or validation), the node state will not be set to TAKEN, leaving the node in an inconsistent state where it may be in the queue but not marked properly.
Original order was safer:
self._graph.nodes[node_id].state = NodeState.TAKEN # Update state first
self._ready_queue.put(node_id) # Then enqueueIf the enqueue operation fails, the state is already updated. The worker will dequeue and execute. If state update happens AFTER enqueue and enqueue succeeds but state update fails (unlikely but possible with property setters), the node is queued but not marked TAKEN.
Question: What was the specific reason for changing this order? If it's to ensure the queue reflects reality before the state changes, please document this reasoning in a comment.
Prompt To Fix With AI
This is a comment left during a code review.
Path: api/core/workflow/graph_engine/graph_state_manager.py
Line: 52:54
Comment:
The order of operations inside `enqueue_node()` was changed - `put()` is now called before setting the node state to `TAKEN`. While both operations are protected by `self._lock`, this change has implications:
**Potential Issue:** If `ready_queue.put()` raises an exception (e.g., if the queue implementation has capacity limits or validation), the node state will not be set to `TAKEN`, leaving the node in an inconsistent state where it may be in the queue but not marked properly.
**Original order was safer:**
```python
self._graph.nodes[node_id].state = NodeState.TAKEN # Update state first
self._ready_queue.put(node_id) # Then enqueue
```
If the enqueue operation fails, the state is already updated. The worker will dequeue and execute. If state update happens AFTER enqueue and enqueue succeeds but state update fails (unlikely but possible with property setters), the node is queued but not marked TAKEN.
**Question:** What was the specific reason for changing this order? If it's to ensure the queue reflects reality before the state changes, please document this reasoning in a comment.
How can I resolve this? If you propose a fix, please make it concise.| mock_state_manager.start_execution.assert_called_once_with("node_2") | ||
| mock_state_manager.enqueue_node.assert_called_once_with("node_2") |
There was a problem hiding this comment.
The test verifies that both start_execution() and enqueue_node() are called, but does NOT verify the order of these calls. Since the order matters for preventing race conditions (as evidenced by the fix in this PR), the test should explicitly verify the calling order.
Consider using assert_has_calls() to verify the exact sequence:
| mock_state_manager.start_execution.assert_called_once_with("node_2") | |
| mock_state_manager.enqueue_node.assert_called_once_with("node_2") | |
| # Assert - verify both the calls and their order | |
| mock_state_manager.start_execution.assert_called_once_with("node_2") | |
| mock_state_manager.enqueue_node.assert_called_once_with("node_2") | |
| # Verify order: start_execution must be called before enqueue_node | |
| calls = mock_state_manager.method_calls | |
| start_idx = next(i for i, call in enumerate(calls) if call[0] == 'start_execution') | |
| enqueue_idx = next(i for i, call in enumerate(calls) if call[0] == 'enqueue_node') | |
| assert start_idx < enqueue_idx, "start_execution should be called before enqueue_node" |
This ensures the fix for the race condition is maintained in future changes.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Prompt To Fix With AI
This is a comment left during a code review.
Path: api/tests/unit_tests/core/workflow/graph_engine/graph_traversal/test_skip_propagator.py
Line: 80:81
Comment:
The test verifies that both `start_execution()` and `enqueue_node()` are called, but does NOT verify the order of these calls. Since the order matters for preventing race conditions (as evidenced by the fix in this PR), the test should explicitly verify the calling order.
Consider using `assert_has_calls()` to verify the exact sequence:
```suggestion
# Assert - verify both the calls and their order
mock_state_manager.start_execution.assert_called_once_with("node_2")
mock_state_manager.enqueue_node.assert_called_once_with("node_2")
# Verify order: start_execution must be called before enqueue_node
calls = mock_state_manager.method_calls
start_idx = next(i for i, call in enumerate(calls) if call[0] == 'start_execution')
enqueue_idx = next(i for i, call in enumerate(calls) if call[0] == 'enqueue_node')
assert start_idx < enqueue_idx, "start_execution should be called before enqueue_node"
```
This ensures the fix for the race condition is maintained in future changes.
<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>
How can I resolve this? If you propose a fix, please make it concise.| # If any edge is taken, node may still execute | ||
| if edge_states["has_taken"]: | ||
| # Enqueue node | ||
| self._state_manager.start_execution(downstream_node_id) | ||
| self._state_manager.enqueue_node(downstream_node_id) | ||
| return |
There was a problem hiding this comment.
Critical Bug: Node can be enqueued multiple times
When a node has multiple incoming edges that are processed as TAKEN, propagate_skip_from_edge() will be called once per edge. Each call checks has_taken and enqueues the node again, causing the same node to appear in the ready queue multiple times.
Example Scenario:
Node A has 2 incoming edges (edge1, edge2)
1. Edge1 marked TAKEN → propagate_skip_from_edge(edge1) called
- Sees has_taken=true → enqueues Node A
2. Edge2 marked TAKEN → propagate_skip_from_edge(edge2) called
- Sees has_taken=true → enqueues Node A AGAIN
Result: Node A is in the queue twice, will be executed twice by workers.
Fix: Check if the node is already enqueued/executing before enqueueing:
| # If any edge is taken, node may still execute | |
| if edge_states["has_taken"]: | |
| # Enqueue node | |
| self._state_manager.start_execution(downstream_node_id) | |
| self._state_manager.enqueue_node(downstream_node_id) | |
| return | |
| # If any edge is taken, node may still execute | |
| if edge_states["has_taken"]: | |
| # Only enqueue if not already enqueued or executing | |
| if not self._state_manager.is_executing(downstream_node_id): | |
| self._state_manager.start_execution(downstream_node_id) | |
| self._state_manager.enqueue_node(downstream_node_id) | |
| return |
Alternatively, check the node state to prevent duplicate enqueueing:
if self._state_manager.get_node_state(downstream_node_id) == NodeState.UNKNOWN:
# Node hasn't been processed yet
self._state_manager.start_execution(downstream_node_id)
self._state_manager.enqueue_node(downstream_node_id)Prompt To Fix With AI
This is a comment left during a code review.
Path: api/core/workflow/graph_engine/graph_traversal/skip_propagator.py
Line: 59:64
Comment:
**Critical Bug: Node can be enqueued multiple times**
When a node has multiple incoming edges that are processed as TAKEN, `propagate_skip_from_edge()` will be called once per edge. Each call checks `has_taken` and enqueues the node again, causing the same node to appear in the ready queue multiple times.
**Example Scenario:**
```
Node A has 2 incoming edges (edge1, edge2)
1. Edge1 marked TAKEN → propagate_skip_from_edge(edge1) called
- Sees has_taken=true → enqueues Node A
2. Edge2 marked TAKEN → propagate_skip_from_edge(edge2) called
- Sees has_taken=true → enqueues Node A AGAIN
```
Result: Node A is in the queue twice, will be executed twice by workers.
**Fix:** Check if the node is already enqueued/executing before enqueueing:
```suggestion
# If any edge is taken, node may still execute
if edge_states["has_taken"]:
# Only enqueue if not already enqueued or executing
if not self._state_manager.is_executing(downstream_node_id):
self._state_manager.start_execution(downstream_node_id)
self._state_manager.enqueue_node(downstream_node_id)
return
```
Alternatively, check the node state to prevent duplicate enqueueing:
```python
if self._state_manager.get_node_state(downstream_node_id) == NodeState.UNKNOWN:
# Node hasn't been processed yet
self._state_manager.start_execution(downstream_node_id)
self._state_manager.enqueue_node(downstream_node_id)
```
How can I resolve this? If you propose a fix, please make it concise.
Additional Comments (1)
The Scenario: Result: NodeC appears in the queue twice and will be executed by two workers. Recommendation: Add idempotency check: Note: This is a pre-existing issue, not introduced by this PR, but should be addressed. Prompt To Fix With AIThis is a comment left during a code review.
Path: api/core/workflow/graph_engine/graph_state_manager.py
Line: 42:54
Comment:
**Pre-existing Issue: No protection against duplicate enqueueing**
The `enqueue_node()` method doesn't check if a node is already enqueued (state=TAKEN) before adding it to the ready queue. This can cause nodes to be executed multiple times when they have multiple incoming edges.
**Scenario:**
```
NodeA --edge1--> NodeC
NodeB --edge2--> NodeC
1. NodeA completes → edge1 marked TAKEN → NodeC is ready → enqueued (state=TAKEN)
2. NodeB completes → edge2 marked TAKEN → NodeC still reports as ready → enqueued AGAIN
```
Result: NodeC appears in the queue twice and will be executed by two workers.
**Recommendation:** Add idempotency check:
```suggestion
def enqueue_node(self, node_id: str) -> None:
"""
Mark a node as TAKEN and add it to the ready queue.
This combines the state transition and enqueueing operations
that always occur together when preparing a node for execution.
Args:
node_id: The ID of the node to enqueue
"""
with self._lock:
# Only enqueue if not already enqueued
if self._graph.nodes[node_id].state != NodeState.TAKEN:
self._ready_queue.put(node_id)
self._graph.nodes[node_id].state = NodeState.TAKEN
```
Note: This is a pre-existing issue, not introduced by this PR, but should be addressed.
How can I resolve this? If you propose a fix, please make it concise. |
Benchmark PR from qodo-benchmark#133