Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion codeframe/agents/lead_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1870,7 +1870,9 @@ async def _assign_and_execute_task(self, task: Task, retry_counts: Dict[int, int
# Mark agent busy
self.agent_pool_manager.mark_agent_busy(agent_id, task.id)

# Update task status to in_progress
# Update task with assigned agent and status (Issue #248 fix)
# Set assigned_to BEFORE status change so UI shows assignment immediately
self.db.update_task(task.id, {"assigned_to": agent_id})
self.db.update_task(task.id, {"status": "in_progress"})

# Get agent instance
Expand Down
128 changes: 128 additions & 0 deletions codeframe/ui/routers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,131 @@ async def approve_tasks(
excluded_count=len(excluded_tasks),
message=f"Successfully approved {len(approved_tasks)} tasks. Development phase started."
)


# ============================================================================
# Task Assignment Endpoint (Issue #248 - Manual trigger for stuck tasks)
# ============================================================================


class TaskAssignmentResponse(BaseModel):
"""Response model for task assignment."""
success: bool
pending_count: int
message: str


@project_router.post("/{project_id}/tasks/assign")
async def assign_pending_tasks(
project_id: int,
background_tasks: BackgroundTasks,
db: Database = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> TaskAssignmentResponse:
"""Manually trigger task assignment for pending unassigned tasks.

This endpoint allows users to restart the multi-agent execution process
when tasks are stuck in 'pending' state with no agent assigned. This can
happen when:
- User joins a session after the initial execution completed/failed
- The original execution timed out or crashed
- WebSocket messages were missed

Args:
project_id: Project ID
background_tasks: FastAPI background tasks for async execution
db: Database connection
current_user: Authenticated user

Returns:
TaskAssignmentResponse with pending task count and status

Raises:
HTTPException:
- 400: Project not in active phase
- 403: Access denied
- 404: Project not found
"""
# Verify project exists
project = db.get_project(project_id)
if not project:
raise HTTPException(
status_code=404,
detail=f"Project {project_id} not found"
)

# Authorization check
if not db.user_has_project_access(current_user.id, project_id):
raise HTTPException(status_code=403, detail="Access denied")

# Validate project is in active phase (development)
current_phase = project.get("phase", "discovery")
if current_phase != "active":
raise HTTPException(
status_code=400,
detail=f"Project must be in active (development) phase to assign tasks. Current phase: {current_phase}"
)

# Get all tasks and count pending unassigned ones
tasks = db.get_project_tasks(project_id)
pending_unassigned = [
t for t in tasks
if t.status == TaskStatus.PENDING and not t.assigned_to
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t.status may be a string, so comparing to TaskStatus members (TaskStatus.PENDING / TaskStatus.IN_PROGRESS) will fail and skip tasks. Consider normalizing t.status to a TaskStatus before filtering, or compare to .value if statuses are stored as strings.

🚀 Want me to fix this? Reply ex: "fix it for me".

]
pending_count = len(pending_unassigned)

if pending_count == 0:
# Debug logging to help diagnose why tasks might appear stuck
logger.debug(
f"assign_pending_tasks called for project {project_id} but found 0 pending unassigned tasks. "
f"Total tasks: {len(tasks)}, statuses: {[t.status.value for t in tasks]}"
)
return TaskAssignmentResponse(
success=True,
pending_count=0,
message="No pending unassigned tasks to assign."
)

# Check if execution is already in progress (Phase 1 fix for concurrent execution)
# Include ASSIGNED status to prevent race between assignment and execution start
executing_tasks = [
t for t in tasks
if t.status in [TaskStatus.ASSIGNED, TaskStatus.IN_PROGRESS]
]
if executing_tasks:
logger.info(
f"⏳ Execution already in progress for project {project_id}: "
f"{len(executing_tasks)} tasks assigned/running"
)
return TaskAssignmentResponse(
success=True,
pending_count=pending_count,
message=f"Execution already in progress ({len(executing_tasks)} task(s) assigned/running). Please wait."
)

# Schedule multi-agent execution in background
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
logger.warning(
f"⚠️ ANTHROPIC_API_KEY not configured - cannot assign tasks for project {project_id}"
)
return TaskAssignmentResponse(
success=False,
pending_count=pending_count,
message="Cannot assign tasks: API key not configured. Please contact administrator."
)

background_tasks.add_task(
start_development_execution,
project_id,
db,
manager,
api_key
)
logger.info(f"✅ Scheduled task assignment for project {project_id} ({pending_count} pending tasks)")

return TaskAssignmentResponse(
success=True,
pending_count=pending_count,
message=f"Assignment started for {pending_count} pending task(s)."
)
226 changes: 226 additions & 0 deletions tests/integration/test_multi_agent_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,3 +667,229 @@ def record_usage(i: int):
total = cursor.fetchone()["total"]
expected_total = sum(100 + i for i in range(20))
assert total == expected_total


@pytest.mark.integration
class TestAssignAndExecuteTaskAssignedTo:
"""Tests for _assign_and_execute_task setting assigned_to field (Issue #248 fix)."""

@pytest.mark.asyncio
async def test_assign_and_execute_task_sets_assigned_to(
self, real_db: Database, test_workspace: Path
):
"""Test that _assign_and_execute_task sets the assigned_to field on the task.

This is a regression test for Issue #248 where tasks remained showing
'Assigned to: Unassigned' because the assigned_to field was never populated
during task execution.
"""
from codeframe.agents.lead_agent import LeadAgent

# Setup project
project_id = real_db.create_project(
name="assigned-to-test",
description="Test assigned_to field population",
source_type="empty",
workspace_path=str(test_workspace),
)

# Create issue and task
issue_id = real_db.create_issue({
"project_id": project_id,
"issue_number": "AT-001",
"title": "Test Issue",
"description": "Test issue for assigned_to",
"priority": 1,
"workflow_step": 1,
})

task_id = real_db.create_task_with_issue(
project_id=project_id,
issue_id=issue_id,
task_number="AT-001-1",
parent_issue_number="AT-001",
title="Test Task for Assignment",
description="This task should have assigned_to set",
status=TaskStatus.PENDING,
priority=1,
workflow_step=1,
can_parallelize=True,
)

# Verify task starts with no assigned_to
task_before = real_db.get_task(task_id)
assert task_before.assigned_to is None, "Task should start unassigned"

# Create LeadAgent with mocked execution
with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "sk-ant-test-key"}):
with patch("codeframe.agents.lead_agent.AgentPoolManager") as mock_pool_class:
# Setup mock pool manager
mock_pool = Mock()
mock_pool.get_or_create_agent.return_value = "test-agent-001"
mock_pool.mark_agent_busy.return_value = None
mock_pool.mark_agent_idle.return_value = None
mock_pool.get_agent_status.return_value = {
"test-agent-001": {"status": "idle", "agent_type": "backend"}
}

# Mock agent instance with execute_task
mock_agent_instance = Mock()
mock_agent_instance.execute_task = AsyncMock(return_value={"status": "completed"})
mock_pool.get_agent_instance.return_value = mock_agent_instance

mock_pool_class.return_value = mock_pool

lead_agent = LeadAgent(
project_id=project_id,
db=real_db,
api_key="sk-ant-test-key",
ws_manager=None,
)
lead_agent.agent_pool_manager = mock_pool

# Also mock the review agent to avoid review step
with patch.object(lead_agent.agent_pool_manager, "get_or_create_agent") as mock_get_agent:
# First call returns worker agent, second call returns review agent
mock_get_agent.side_effect = ["test-agent-001", "review-agent-001"]

mock_review_instance = Mock()
mock_review_report = Mock()
mock_review_report.status = "approved"
mock_review_report.overall_score = 9.0
mock_review_instance.execute_task = AsyncMock(return_value=mock_review_report)

def get_instance_side_effect(agent_id):
if agent_id == "review-agent-001":
return mock_review_instance
return mock_agent_instance

mock_pool.get_agent_instance.side_effect = get_instance_side_effect

# Get the task object
task = real_db.get_task(task_id)

# Execute _assign_and_execute_task
retry_counts = {}
result = await lead_agent._assign_and_execute_task(task, retry_counts)

assert result is True, "Task execution should succeed"

# CRITICAL ASSERTION: Verify assigned_to was set
task_after = real_db.get_task(task_id)
assert task_after.assigned_to == "test-agent-001", (
f"Task assigned_to should be 'test-agent-001' but was '{task_after.assigned_to}'. "
"This is the Issue #248 bug - assigned_to field not being populated."
)

@pytest.mark.asyncio
async def test_assign_and_execute_task_sets_assigned_to_before_in_progress(
self, real_db: Database, test_workspace: Path
):
"""Test that assigned_to is set before status changes to in_progress.

The UI needs to show assignment even during the brief period before
task execution begins.
"""
from codeframe.agents.lead_agent import LeadAgent

# Setup project
project_id = real_db.create_project(
name="assigned-to-order-test",
description="Test assigned_to ordering",
source_type="empty",
workspace_path=str(test_workspace),
)

issue_id = real_db.create_issue({
"project_id": project_id,
"issue_number": "AO-001",
"title": "Order Test Issue",
"description": "Test",
"priority": 1,
"workflow_step": 1,
})

task_id = real_db.create_task_with_issue(
project_id=project_id,
issue_id=issue_id,
task_number="AO-001-1",
parent_issue_number="AO-001",
title="Order Test Task",
description="Test ordering",
status=TaskStatus.PENDING,
priority=1,
workflow_step=1,
can_parallelize=True,
)

# Track database update calls to verify ordering
update_calls = []
original_update_task = real_db.update_task

def tracking_update_task(task_id, updates):
update_calls.append((task_id, updates.copy()))
return original_update_task(task_id, updates)

with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "sk-ant-test-key"}):
with patch("codeframe.agents.lead_agent.AgentPoolManager") as mock_pool_class:
mock_pool = Mock()
mock_pool.get_or_create_agent.return_value = "order-agent-001"
mock_pool.mark_agent_busy.return_value = None
mock_pool.mark_agent_idle.return_value = None
mock_pool.get_agent_status.return_value = {}

mock_agent_instance = Mock()
mock_agent_instance.execute_task = AsyncMock(return_value={"status": "completed"})

mock_review_instance = Mock()
mock_review_report = Mock()
mock_review_report.status = "approved"
mock_review_report.overall_score = 9.0
mock_review_instance.execute_task = AsyncMock(return_value=mock_review_report)

def get_agent_side_effect(agent_type):
if agent_type == "review":
return "review-agent-001"
return "order-agent-001"

mock_pool.get_or_create_agent.side_effect = get_agent_side_effect

def get_instance_side_effect(agent_id):
if agent_id == "review-agent-001":
return mock_review_instance
return mock_agent_instance

mock_pool.get_agent_instance.side_effect = get_instance_side_effect

mock_pool_class.return_value = mock_pool

lead_agent = LeadAgent(
project_id=project_id,
db=real_db,
api_key="sk-ant-test-key",
ws_manager=None,
)
lead_agent.agent_pool_manager = mock_pool

# Patch update_task to track calls
with patch.object(real_db, "update_task", side_effect=tracking_update_task):
task = real_db.get_task(task_id)
retry_counts = {}
await lead_agent._assign_and_execute_task(task, retry_counts)

# Verify update order: assigned_to should be set before or with in_progress
assigned_to_index = None
in_progress_index = None

for i, (tid, updates) in enumerate(update_calls):
if "assigned_to" in updates:
assigned_to_index = i
if updates.get("status") == "in_progress":
in_progress_index = i

assert assigned_to_index is not None, "assigned_to should be updated"
assert in_progress_index is not None, "status should be updated to in_progress"
assert assigned_to_index <= in_progress_index, (
f"assigned_to (call {assigned_to_index}) should be set before or with "
f"in_progress (call {in_progress_index})"
)
Loading
Loading