Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 293 additions & 31 deletions lib/crewai/src/crewai/crew.py

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion lib/crewai/src/crewai/crews/crew_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.types.usage_metrics import UsageMetrics, WorkflowTokenMetrics


class CrewOutput(BaseModel):
Expand All @@ -26,6 +26,10 @@ class CrewOutput(BaseModel):
token_usage: UsageMetrics = Field(
description="Processed token summary", default_factory=UsageMetrics
)
token_metrics: WorkflowTokenMetrics | None = Field(
description="Detailed per-agent and per-task token metrics",
default=None
)

@property
def json(self) -> str | None: # type: ignore[override]
Expand Down
71 changes: 62 additions & 9 deletions lib/crewai/src/crewai/task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from collections.abc import Callable
from concurrent.futures import Future
from copy import copy as shallow_copy
import datetime
Expand Down Expand Up @@ -476,13 +477,34 @@ def execute_async(
agent: BaseAgent | None = None,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> Future[TaskOutput]:
"""Execute the task asynchronously."""
future: Future[TaskOutput] = Future()
token_capture_callback: Callable[[], Any] | None = None,
agent_execution_lock: threading.Lock | None = None,
) -> Future[TaskOutput | tuple[TaskOutput, Any, Any]]:
"""Execute the task asynchronously.

Args:
agent: The agent to execute the task.
context: Context for the task execution.
tools: Tools available for the task.
token_capture_callback: Optional callback to capture token usage.
If provided, the future will return a tuple of
(TaskOutput, tokens_before, tokens_after) instead of just TaskOutput.
The callback is called twice: once before task execution (after
acquiring the lock if one is provided) and once after task completion.
agent_execution_lock: Optional lock to serialize task execution for
the same agent. This is used to ensure accurate per-task token
tracking when multiple async tasks from the same agent run
concurrently.

Returns:
Future containing TaskOutput, or tuple of (TaskOutput, tokens_before, tokens_after)
if token_capture_callback is provided.
"""
future: Future[TaskOutput | tuple[TaskOutput, Any, Any]] = Future()
threading.Thread(
daemon=True,
target=self._execute_task_async,
args=(agent, context, tools, future),
args=(agent, context, tools, future, token_capture_callback, agent_execution_lock),
).start()
return future

Expand All @@ -491,14 +513,45 @@ def _execute_task_async(
agent: BaseAgent | None,
context: str | None,
tools: list[Any] | None,
future: Future[TaskOutput],
future: Future[TaskOutput | tuple[TaskOutput, Any, Any]],
token_capture_callback: Callable[[], Any] | None = None,
agent_execution_lock: threading.Lock | None = None,
) -> None:
"""Execute the task asynchronously with context handling."""
"""Execute the task asynchronously with context handling.

If agent_execution_lock is provided, the task execution will be
serialized with other tasks using the same lock. This ensures
accurate per-task token tracking by:
1. Capturing tokens_before after acquiring the lock
2. Executing the task
3. Capturing tokens_after immediately after completion
4. Releasing the lock

If token_capture_callback is provided, it will be called twice:
once before task execution and once after, both while holding the lock.
"""
try:
result = self._execute_core(agent, context, tools)
future.set_result(result)
if agent_execution_lock:
with agent_execution_lock:
if token_capture_callback:
tokens_before = token_capture_callback()
result = self._execute_core(agent, context, tools)
if token_capture_callback:
tokens_after = token_capture_callback()
future.set_result((result, tokens_before, tokens_after))
else:
future.set_result(result)
else:
if token_capture_callback:
tokens_before = token_capture_callback()
result = self._execute_core(agent, context, tools)
if token_capture_callback:
tokens_after = token_capture_callback()
future.set_result((result, tokens_before, tokens_after))
else:
future.set_result(result)
except Exception as e:
future.set_exception(e)
future.set_exception(e)

async def aexecute_sync(
self,
Expand Down
6 changes: 6 additions & 0 deletions lib/crewai/src/crewai/tasks/task_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pydantic import BaseModel, Field, model_validator

from crewai.tasks.output_format import OutputFormat
from crewai.types.usage_metrics import TaskTokenMetrics
from crewai.utilities.types import LLMMessage


Expand All @@ -22,6 +23,7 @@ class TaskOutput(BaseModel):
json_dict: JSON dictionary output of the task
agent: Agent that executed the task
output_format: Output format of the task (JSON, PYDANTIC, or RAW)
usage_metrics: Token usage metrics for this specific task
"""

description: str = Field(description="Description of the task")
Expand All @@ -42,6 +44,10 @@ class TaskOutput(BaseModel):
description="Output format of the task", default=OutputFormat.RAW
)
messages: list[LLMMessage] = Field(description="Messages of the task", default=[])
usage_metrics: TaskTokenMetrics | None = Field(
description="Token usage metrics for this task",
default=None
)

@model_validator(mode="after")
def set_summary(self):
Expand Down
71 changes: 71 additions & 0 deletions lib/crewai/src/crewai/types/usage_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,74 @@ def add_usage_metrics(self, usage_metrics: Self) -> None:
self.cached_prompt_tokens += usage_metrics.cached_prompt_tokens
self.completion_tokens += usage_metrics.completion_tokens
self.successful_requests += usage_metrics.successful_requests


class AgentTokenMetrics(BaseModel):
"""Token usage metrics for a specific agent.

Attributes:
agent_name: Name/role of the agent
agent_id: Unique identifier for the agent
total_tokens: Total tokens used by this agent
prompt_tokens: Prompt tokens used by this agent
completion_tokens: Completion tokens used by this agent
successful_requests: Number of successful LLM requests
"""

agent_name: str = Field(description="Name/role of the agent")
agent_id: str | None = Field(default=None, description="Unique identifier for the agent")
total_tokens: int = Field(default=0, description="Total tokens used by this agent")
prompt_tokens: int = Field(default=0, description="Prompt tokens used by this agent")
cached_prompt_tokens: int = Field(default=0, description="Cached prompt tokens used by this agent")
completion_tokens: int = Field(default=0, description="Completion tokens used by this agent")
successful_requests: int = Field(default=0, description="Number of successful LLM requests")


class TaskTokenMetrics(BaseModel):
"""Token usage metrics for a specific task.

Attributes:
task_name: Name of the task
task_id: Unique identifier for the task
agent_name: Name of the agent that executed the task
total_tokens: Total tokens used for this task
prompt_tokens: Prompt tokens used for this task
completion_tokens: Completion tokens used for this task
successful_requests: Number of successful LLM requests
"""

task_name: str = Field(description="Name of the task")
task_id: str | None = Field(default=None, description="Unique identifier for the task")
agent_name: str = Field(description="Name of the agent that executed the task")
total_tokens: int = Field(default=0, description="Total tokens used for this task")
prompt_tokens: int = Field(default=0, description="Prompt tokens used for this task")
cached_prompt_tokens: int = Field(default=0, description="Cached prompt tokens used for this task")
completion_tokens: int = Field(default=0, description="Completion tokens used for this task")
successful_requests: int = Field(default=0, description="Number of successful LLM requests")


class WorkflowTokenMetrics(BaseModel):
"""Complete token usage metrics for a crew workflow.

Attributes:
total_tokens: Total tokens used across entire workflow
prompt_tokens: Total prompt tokens used
completion_tokens: Total completion tokens used
successful_requests: Total successful requests
per_agent: Dictionary mapping agent names to their token metrics
per_task: Dictionary mapping task names to their token metrics
"""

total_tokens: int = Field(default=0, description="Total tokens used across entire workflow")
prompt_tokens: int = Field(default=0, description="Total prompt tokens used")
cached_prompt_tokens: int = Field(default=0, description="Total cached prompt tokens used")
completion_tokens: int = Field(default=0, description="Total completion tokens used")
successful_requests: int = Field(default=0, description="Total successful requests")
per_agent: dict[str, AgentTokenMetrics] = Field(
default_factory=dict,
description="Token metrics per agent"
)
per_task: dict[str, TaskTokenMetrics] = Field(
default_factory=dict,
description="Token metrics per task"
)
Loading
Loading