Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 93 additions & 20 deletions backend/app/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
from app.services.openclaw.provisioning_db import AgentLifecycleService
from app.services.organizations import require_board_access
from app.services.tags import (
TagState,
Expand Down Expand Up @@ -661,15 +662,57 @@ async def _latest_task_comment_by_agent(
return (await session.exec(statement)).first()


async def _wake_agent_online_for_task(
*,
session: AsyncSession,
board: Board,
task: Task,
agent: Agent,
reason: str,
) -> None:
if not agent.openclaw_session_id:
return
service = AgentLifecycleService(session)
try:
await service.commit_heartbeat(agent=agent, status_value="online")
record_activity(
session,
event_type="task.assignee_woken",
message=(f"Assignee heartbeat set online ({reason}): {agent.name}."),
agent_id=agent.id,
task_id=task.id,
board_id=board.id,
)
except Exception as exc: # pragma: no cover - best effort wake path
record_activity(
session,
event_type="task.assignee_wake_failed",
message=(f"Assignee wake failed ({reason}): {agent.name}. Error: {exc!s}"),
agent_id=agent.id,
task_id=task.id,
board_id=board.id,
)
await session.commit()


async def _notify_agent_on_task_assign(
*,
session: AsyncSession,
board: Board,
task: Task,
agent: Agent,
wake_assignee: bool = True,
) -> None:
if not agent.openclaw_session_id:
return
if wake_assignee:
await _wake_agent_online_for_task(
session=session,
board=board,
task=task,
agent=agent,
reason="assignment",
)
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
Expand Down Expand Up @@ -2121,15 +2164,23 @@ async def _lead_apply_status(
lead_agent = update.actor.agent
if "status" not in update.updates:
return
target_status = _required_status_value(update.updates["status"])
# Leads may set `in_progress` when simultaneously assigning an agent to an
# inbox task (assignment-and-start shortcut).
if update.task.status != "review":
assigning_agent = "assigned_agent_id" in update.updates and bool(
_optional_assigned_agent_id(update.updates["assigned_agent_id"])
)
if update.task.status == "inbox" and target_status == "in_progress" and assigning_agent:
update.task.status = target_status
return
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=(
"Lead status gate failed: board leads can only change status when the current "
f"task status is `review` (current: `{update.task.status}`)."
),
)
target_status = _required_status_value(update.updates["status"])
if target_status not in {"done", "inbox"}:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
Expand Down Expand Up @@ -2521,66 +2572,88 @@ async def _notify_task_update_assignment_changes(
*,
update: _TaskUpdateInput,
) -> None:
board: Board | None = None

async def _board() -> Board | None:
nonlocal board
if board is None and update.task.board_id:
board = await Board.objects.by_id(update.task.board_id).first(session)
return board

if (
update.task.status == "inbox"
and update.task.assigned_agent_id is None
and (update.previous_status != "inbox" or update.previous_assigned is not None)
):
board = (
await Board.objects.by_id(update.task.board_id).first(session)
if update.task.board_id
else None
)
if board:
current_board = await _board()
if current_board:
await _notify_lead_on_task_unassigned(
session=session,
board=board,
board=current_board,
task=update.task,
)

if (
not update.task.assigned_agent_id
or update.task.assigned_agent_id == update.previous_assigned
):
if not update.task.assigned_agent_id:
return

assigned_agent = await Agent.objects.by_id(update.task.assigned_agent_id).first(
session,
)
if assigned_agent is None:
return
board = (
await Board.objects.by_id(update.task.board_id).first(session)
if update.task.board_id
else None

assignment_changed = update.task.assigned_agent_id != update.previous_assigned
entered_in_progress = (
update.task.status == "in_progress" and update.previous_status != "in_progress"
)

if entered_in_progress and not assignment_changed:
current_board = await _board()
if current_board:
await _wake_agent_online_for_task(
session=session,
board=current_board,
task=update.task,
agent=assigned_agent,
reason="status_in_progress",
)

if not assignment_changed:
return

if (
update.previous_status == "review"
and update.task.status == "inbox"
and update.actor.actor_type == "agent"
and update.actor.agent
and update.actor.agent.is_board_lead
):
if board:
current_board = await _board()
if current_board:
await _notify_agent_on_task_rework(
session=session,
board=board,
board=current_board,
task=update.task,
agent=assigned_agent,
lead=update.actor.agent,
)
return

if (
update.actor.actor_type == "agent"
and update.actor.agent
and update.task.assigned_agent_id == update.actor.agent.id
):
return
if board:

current_board = await _board()
if current_board:
await _notify_agent_on_task_assign(
session=session,
board=board,
board=current_board,
task=update.task,
agent=assigned_agent,
wake_assignee=True,
)


Expand Down
Loading
Loading