Skip to content

Commit 9b8f31f

Browse files
feat: async task support (#4024)
* feat: add async support for tools, add async tool tests * chore: improve tool decorator typing * fix: ensure _run backward compat * chore: update docs * chore: make docstrings a little more readable * feat: add async execution support to agent executor * chore: add tests * feat: add aiosqlite dep; regenerate lockfile * feat: add async ops to memory feat; create tests * feat: async knowledge support; add tests * feat: add async task support * chore: dry out duplicate logic
1 parent d898d7c commit 9b8f31f

File tree

9 files changed

+1237
-171
lines changed

9 files changed

+1237
-171
lines changed

lib/crewai/src/crewai/agent/core.py

Lines changed: 253 additions & 161 deletions
Large diffs are not rendered by default.
Lines changed: 355 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
"""Utility functions for agent task execution.
2+
3+
This module contains shared logic extracted from the Agent's execute_task
4+
and aexecute_task methods to reduce code duplication.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import json
10+
from typing import TYPE_CHECKING, Any
11+
12+
from crewai.events.event_bus import crewai_event_bus
13+
from crewai.events.types.knowledge_events import (
14+
KnowledgeRetrievalCompletedEvent,
15+
KnowledgeRetrievalStartedEvent,
16+
KnowledgeSearchQueryFailedEvent,
17+
)
18+
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
19+
from crewai.utilities.converter import generate_model_description
20+
21+
22+
if TYPE_CHECKING:
23+
from crewai.agent.core import Agent
24+
from crewai.task import Task
25+
from crewai.tools.base_tool import BaseTool
26+
from crewai.utilities.i18n import I18N
27+
28+
29+
def handle_reasoning(agent: Agent, task: Task) -> None:
30+
"""Handle the reasoning process for an agent before task execution.
31+
32+
Args:
33+
agent: The agent performing the task.
34+
task: The task to execute.
35+
"""
36+
if not agent.reasoning:
37+
return
38+
39+
try:
40+
from crewai.utilities.reasoning_handler import (
41+
AgentReasoning,
42+
AgentReasoningOutput,
43+
)
44+
45+
reasoning_handler = AgentReasoning(task=task, agent=agent)
46+
reasoning_output: AgentReasoningOutput = (
47+
reasoning_handler.handle_agent_reasoning()
48+
)
49+
task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}"
50+
except Exception as e:
51+
agent._logger.log("error", f"Error during reasoning process: {e!s}")
52+
53+
54+
def build_task_prompt_with_schema(task: Task, task_prompt: str, i18n: I18N) -> str:
55+
"""Build task prompt with JSON/Pydantic schema instructions if applicable.
56+
57+
Args:
58+
task: The task being executed.
59+
task_prompt: The initial task prompt.
60+
i18n: Internationalization instance.
61+
62+
Returns:
63+
The task prompt potentially augmented with schema instructions.
64+
"""
65+
if (task.output_json or task.output_pydantic) and not task.response_model:
66+
if task.output_json:
67+
schema_dict = generate_model_description(task.output_json)
68+
schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2)
69+
task_prompt += "\n" + i18n.slice("formatted_task_instructions").format(
70+
output_format=schema
71+
)
72+
elif task.output_pydantic:
73+
schema_dict = generate_model_description(task.output_pydantic)
74+
schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2)
75+
task_prompt += "\n" + i18n.slice("formatted_task_instructions").format(
76+
output_format=schema
77+
)
78+
return task_prompt
79+
80+
81+
def format_task_with_context(task_prompt: str, context: str | None, i18n: I18N) -> str:
82+
"""Format task prompt with context if provided.
83+
84+
Args:
85+
task_prompt: The task prompt.
86+
context: Optional context string.
87+
i18n: Internationalization instance.
88+
89+
Returns:
90+
The task prompt formatted with context if provided.
91+
"""
92+
if context:
93+
return i18n.slice("task_with_context").format(task=task_prompt, context=context)
94+
return task_prompt
95+
96+
97+
def get_knowledge_config(agent: Agent) -> dict[str, Any]:
98+
"""Get knowledge configuration from agent.
99+
100+
Args:
101+
agent: The agent instance.
102+
103+
Returns:
104+
Dictionary of knowledge configuration.
105+
"""
106+
return agent.knowledge_config.model_dump() if agent.knowledge_config else {}
107+
108+
109+
def handle_knowledge_retrieval(
110+
agent: Agent,
111+
task: Task,
112+
task_prompt: str,
113+
knowledge_config: dict[str, Any],
114+
query_func: Any,
115+
crew_query_func: Any,
116+
) -> str:
117+
"""Handle knowledge retrieval for task execution.
118+
119+
This function handles both agent-specific and crew-specific knowledge queries.
120+
121+
Args:
122+
agent: The agent performing the task.
123+
task: The task being executed.
124+
task_prompt: The current task prompt.
125+
knowledge_config: Knowledge configuration dictionary.
126+
query_func: Function to query agent knowledge (sync or async).
127+
crew_query_func: Function to query crew knowledge (sync or async).
128+
129+
Returns:
130+
The task prompt potentially augmented with knowledge context.
131+
"""
132+
if not (agent.knowledge or (agent.crew and agent.crew.knowledge)):
133+
return task_prompt
134+
135+
crewai_event_bus.emit(
136+
agent,
137+
event=KnowledgeRetrievalStartedEvent(
138+
from_task=task,
139+
from_agent=agent,
140+
),
141+
)
142+
try:
143+
agent.knowledge_search_query = agent._get_knowledge_search_query(
144+
task_prompt, task
145+
)
146+
if agent.knowledge_search_query:
147+
if agent.knowledge:
148+
agent_knowledge_snippets = query_func(
149+
[agent.knowledge_search_query], **knowledge_config
150+
)
151+
if agent_knowledge_snippets:
152+
agent.agent_knowledge_context = extract_knowledge_context(
153+
agent_knowledge_snippets
154+
)
155+
if agent.agent_knowledge_context:
156+
task_prompt += agent.agent_knowledge_context
157+
158+
knowledge_snippets = crew_query_func(
159+
[agent.knowledge_search_query], **knowledge_config
160+
)
161+
if knowledge_snippets:
162+
agent.crew_knowledge_context = extract_knowledge_context(
163+
knowledge_snippets
164+
)
165+
if agent.crew_knowledge_context:
166+
task_prompt += agent.crew_knowledge_context
167+
168+
crewai_event_bus.emit(
169+
agent,
170+
event=KnowledgeRetrievalCompletedEvent(
171+
query=agent.knowledge_search_query,
172+
from_task=task,
173+
from_agent=agent,
174+
retrieved_knowledge=_combine_knowledge_context(agent),
175+
),
176+
)
177+
except Exception as e:
178+
crewai_event_bus.emit(
179+
agent,
180+
event=KnowledgeSearchQueryFailedEvent(
181+
query=agent.knowledge_search_query or "",
182+
error=str(e),
183+
from_task=task,
184+
from_agent=agent,
185+
),
186+
)
187+
return task_prompt
188+
189+
190+
def _combine_knowledge_context(agent: Agent) -> str:
191+
"""Combine agent and crew knowledge contexts into a single string.
192+
193+
Args:
194+
agent: The agent with knowledge contexts.
195+
196+
Returns:
197+
Combined knowledge context string.
198+
"""
199+
agent_ctx = agent.agent_knowledge_context or ""
200+
crew_ctx = agent.crew_knowledge_context or ""
201+
separator = "\n" if agent_ctx and crew_ctx else ""
202+
return agent_ctx + separator + crew_ctx
203+
204+
205+
def apply_training_data(agent: Agent, task_prompt: str) -> str:
206+
"""Apply training data to the task prompt.
207+
208+
Args:
209+
agent: The agent performing the task.
210+
task_prompt: The task prompt.
211+
212+
Returns:
213+
The task prompt with training data applied.
214+
"""
215+
if agent.crew and agent.crew._train:
216+
return agent._training_handler(task_prompt=task_prompt)
217+
return agent._use_trained_data(task_prompt=task_prompt)
218+
219+
220+
def process_tool_results(agent: Agent, result: Any) -> Any:
221+
"""Process tool results, returning result_as_answer if applicable.
222+
223+
Args:
224+
agent: The agent with tool results.
225+
result: The current result.
226+
227+
Returns:
228+
The final result, potentially overridden by tool result_as_answer.
229+
"""
230+
for tool_result in agent.tools_results:
231+
if tool_result.get("result_as_answer", False):
232+
result = tool_result["result"]
233+
return result
234+
235+
236+
def save_last_messages(agent: Agent) -> None:
237+
"""Save the last messages from agent executor.
238+
239+
Args:
240+
agent: The agent instance.
241+
"""
242+
agent._last_messages = (
243+
agent.agent_executor.messages.copy()
244+
if agent.agent_executor and hasattr(agent.agent_executor, "messages")
245+
else []
246+
)
247+
248+
249+
def prepare_tools(
250+
agent: Agent, tools: list[BaseTool] | None, task: Task
251+
) -> list[BaseTool]:
252+
"""Prepare tools for task execution and create agent executor.
253+
254+
Args:
255+
agent: The agent instance.
256+
tools: Optional list of tools.
257+
task: The task being executed.
258+
259+
Returns:
260+
The list of tools to use.
261+
"""
262+
final_tools = tools or agent.tools or []
263+
agent.create_agent_executor(tools=final_tools, task=task)
264+
return final_tools
265+
266+
267+
def validate_max_execution_time(max_execution_time: int | None) -> None:
268+
"""Validate max_execution_time parameter.
269+
270+
Args:
271+
max_execution_time: The maximum execution time to validate.
272+
273+
Raises:
274+
ValueError: If max_execution_time is not a positive integer.
275+
"""
276+
if max_execution_time is not None:
277+
if not isinstance(max_execution_time, int) or max_execution_time <= 0:
278+
raise ValueError(
279+
"Max Execution time must be a positive integer greater than zero"
280+
)
281+
282+
283+
async def ahandle_knowledge_retrieval(
284+
agent: Agent,
285+
task: Task,
286+
task_prompt: str,
287+
knowledge_config: dict[str, Any],
288+
) -> str:
289+
"""Handle async knowledge retrieval for task execution.
290+
291+
Args:
292+
agent: The agent performing the task.
293+
task: The task being executed.
294+
task_prompt: The current task prompt.
295+
knowledge_config: Knowledge configuration dictionary.
296+
297+
Returns:
298+
The task prompt potentially augmented with knowledge context.
299+
"""
300+
if not (agent.knowledge or (agent.crew and agent.crew.knowledge)):
301+
return task_prompt
302+
303+
crewai_event_bus.emit(
304+
agent,
305+
event=KnowledgeRetrievalStartedEvent(
306+
from_task=task,
307+
from_agent=agent,
308+
),
309+
)
310+
try:
311+
agent.knowledge_search_query = agent._get_knowledge_search_query(
312+
task_prompt, task
313+
)
314+
if agent.knowledge_search_query:
315+
if agent.knowledge:
316+
agent_knowledge_snippets = await agent.knowledge.aquery(
317+
[agent.knowledge_search_query], **knowledge_config
318+
)
319+
if agent_knowledge_snippets:
320+
agent.agent_knowledge_context = extract_knowledge_context(
321+
agent_knowledge_snippets
322+
)
323+
if agent.agent_knowledge_context:
324+
task_prompt += agent.agent_knowledge_context
325+
326+
knowledge_snippets = await agent.crew.aquery_knowledge(
327+
[agent.knowledge_search_query], **knowledge_config
328+
)
329+
if knowledge_snippets:
330+
agent.crew_knowledge_context = extract_knowledge_context(
331+
knowledge_snippets
332+
)
333+
if agent.crew_knowledge_context:
334+
task_prompt += agent.crew_knowledge_context
335+
336+
crewai_event_bus.emit(
337+
agent,
338+
event=KnowledgeRetrievalCompletedEvent(
339+
query=agent.knowledge_search_query,
340+
from_task=task,
341+
from_agent=agent,
342+
retrieved_knowledge=_combine_knowledge_context(agent),
343+
),
344+
)
345+
except Exception as e:
346+
crewai_event_bus.emit(
347+
agent,
348+
event=KnowledgeSearchQueryFailedEvent(
349+
query=agent.knowledge_search_query or "",
350+
error=str(e),
351+
from_task=task,
352+
from_agent=agent,
353+
),
354+
)
355+
return task_prompt

lib/crewai/src/crewai/agents/agent_builder/base_agent.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ def validate_mcps(
265265
if not mcps:
266266
return mcps
267267

268-
validated_mcps = []
268+
validated_mcps: list[str | MCPServerConfig] = []
269269
for mcp in mcps:
270270
if isinstance(mcp, str):
271271
if mcp.startswith(("https://", "crewai-amp:")):
@@ -347,6 +347,15 @@ def execute_task(
347347
) -> str:
348348
pass
349349

350+
@abstractmethod
351+
async def aexecute_task(
352+
self,
353+
task: Any,
354+
context: str | None = None,
355+
tools: list[BaseTool] | None = None,
356+
) -> str:
357+
"""Execute a task asynchronously."""
358+
350359
@abstractmethod
351360
def create_agent_executor(self, tools: list[BaseTool] | None = None) -> None:
352361
pass

0 commit comments

Comments
 (0)