Skip to content

Commit 633e279

Browse files
feat: add async support for tools and agent executor; improve typing and docs
Introduces async tool support with new tests, adds async execution to the agent executor, improves tool decorator typing, ensures _run backward compatibility, updates docs and docstrings, and adds additional tests.
1 parent a257789 commit 633e279

File tree

3 files changed

+563
-12
lines changed

3 files changed

+563
-12
lines changed

lib/crewai/src/crewai/agents/crew_agent_executor.py

Lines changed: 155 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
get_before_llm_call_hooks,
2929
)
3030
from crewai.utilities.agent_utils import (
31+
aget_llm_response,
3132
enforce_rpm_limit,
3233
format_message_for_llm,
3334
get_llm_response,
@@ -43,7 +44,10 @@
4344
from crewai.utilities.constants import TRAINING_DATA_FILE
4445
from crewai.utilities.i18n import I18N, get_i18n
4546
from crewai.utilities.printer import Printer
46-
from crewai.utilities.tool_utils import execute_tool_and_check_finality
47+
from crewai.utilities.tool_utils import (
48+
aexecute_tool_and_check_finality,
49+
execute_tool_and_check_finality,
50+
)
4751
from crewai.utilities.training_handler import CrewTrainingHandler
4852

4953

@@ -134,8 +138,8 @@ def __init__(
134138
self.messages: list[LLMMessage] = []
135139
self.iterations = 0
136140
self.log_error_after = 3
137-
self.before_llm_call_hooks: list[Callable] = []
138-
self.after_llm_call_hooks: list[Callable] = []
141+
self.before_llm_call_hooks: list[Callable[..., Any]] = []
142+
self.after_llm_call_hooks: list[Callable[..., Any]] = []
139143
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
140144
self.after_llm_call_hooks.extend(get_after_llm_call_hooks())
141145
if self.llm:
@@ -312,6 +316,154 @@ def _invoke_loop(self) -> AgentFinish:
312316
self._show_logs(formatted_answer)
313317
return formatted_answer
314318

319+
async def ainvoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
320+
"""Execute the agent asynchronously with given inputs.
321+
322+
Args:
323+
inputs: Input dictionary containing prompt variables.
324+
325+
Returns:
326+
Dictionary with agent output.
327+
"""
328+
if "system" in self.prompt:
329+
system_prompt = self._format_prompt(
330+
cast(str, self.prompt.get("system", "")), inputs
331+
)
332+
user_prompt = self._format_prompt(
333+
cast(str, self.prompt.get("user", "")), inputs
334+
)
335+
self.messages.append(format_message_for_llm(system_prompt, role="system"))
336+
self.messages.append(format_message_for_llm(user_prompt))
337+
else:
338+
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
339+
self.messages.append(format_message_for_llm(user_prompt))
340+
341+
self._show_start_logs()
342+
343+
self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False))
344+
345+
try:
346+
formatted_answer = await self._ainvoke_loop()
347+
except AssertionError:
348+
self._printer.print(
349+
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
350+
color="red",
351+
)
352+
raise
353+
except Exception as e:
354+
handle_unknown_error(self._printer, e)
355+
raise
356+
357+
if self.ask_for_human_input:
358+
formatted_answer = self._handle_human_feedback(formatted_answer)
359+
360+
self._create_short_term_memory(formatted_answer)
361+
self._create_long_term_memory(formatted_answer)
362+
self._create_external_memory(formatted_answer)
363+
return {"output": formatted_answer.output}
364+
365+
async def _ainvoke_loop(self) -> AgentFinish:
366+
"""Execute agent loop asynchronously until completion.
367+
368+
Returns:
369+
Final answer from the agent.
370+
"""
371+
formatted_answer = None
372+
while not isinstance(formatted_answer, AgentFinish):
373+
try:
374+
if has_reached_max_iterations(self.iterations, self.max_iter):
375+
formatted_answer = handle_max_iterations_exceeded(
376+
formatted_answer,
377+
printer=self._printer,
378+
i18n=self._i18n,
379+
messages=self.messages,
380+
llm=self.llm,
381+
callbacks=self.callbacks,
382+
)
383+
break
384+
385+
enforce_rpm_limit(self.request_within_rpm_limit)
386+
387+
answer = await aget_llm_response(
388+
llm=self.llm,
389+
messages=self.messages,
390+
callbacks=self.callbacks,
391+
printer=self._printer,
392+
from_task=self.task,
393+
from_agent=self.agent,
394+
response_model=self.response_model,
395+
executor_context=self,
396+
)
397+
formatted_answer = process_llm_response(answer, self.use_stop_words) # type: ignore[assignment]
398+
399+
if isinstance(formatted_answer, AgentAction):
400+
fingerprint_context = {}
401+
if (
402+
self.agent
403+
and hasattr(self.agent, "security_config")
404+
and hasattr(self.agent.security_config, "fingerprint")
405+
):
406+
fingerprint_context = {
407+
"agent_fingerprint": str(
408+
self.agent.security_config.fingerprint
409+
)
410+
}
411+
412+
tool_result = await aexecute_tool_and_check_finality(
413+
agent_action=formatted_answer,
414+
fingerprint_context=fingerprint_context,
415+
tools=self.tools,
416+
i18n=self._i18n,
417+
agent_key=self.agent.key if self.agent else None,
418+
agent_role=self.agent.role if self.agent else None,
419+
tools_handler=self.tools_handler,
420+
task=self.task,
421+
agent=self.agent,
422+
function_calling_llm=self.function_calling_llm,
423+
crew=self.crew,
424+
)
425+
formatted_answer = self._handle_agent_action(
426+
formatted_answer, tool_result
427+
)
428+
429+
self._invoke_step_callback(formatted_answer) # type: ignore[arg-type]
430+
self._append_message(formatted_answer.text) # type: ignore[union-attr,attr-defined]
431+
432+
except OutputParserError as e:
433+
formatted_answer = handle_output_parser_exception( # type: ignore[assignment]
434+
e=e,
435+
messages=self.messages,
436+
iterations=self.iterations,
437+
log_error_after=self.log_error_after,
438+
printer=self._printer,
439+
)
440+
441+
except Exception as e:
442+
if e.__class__.__module__.startswith("litellm"):
443+
raise e
444+
if is_context_length_exceeded(e):
445+
handle_context_length(
446+
respect_context_window=self.respect_context_window,
447+
printer=self._printer,
448+
messages=self.messages,
449+
llm=self.llm,
450+
callbacks=self.callbacks,
451+
i18n=self._i18n,
452+
)
453+
continue
454+
handle_unknown_error(self._printer, e)
455+
raise e
456+
finally:
457+
self.iterations += 1
458+
459+
if not isinstance(formatted_answer, AgentFinish):
460+
raise RuntimeError(
461+
"Agent execution ended without reaching a final answer. "
462+
f"Got {type(formatted_answer).__name__} instead of AgentFinish."
463+
)
464+
self._show_logs(formatted_answer)
465+
return formatted_answer
466+
315467
def _handle_agent_action(
316468
self, formatted_answer: AgentAction, tool_result: ToolResult
317469
) -> AgentAction | AgentFinish:

lib/crewai/src/crewai/utilities/agent_utils.py

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -242,17 +242,17 @@ def get_llm_response(
242242
"""Call the LLM and return the response, handling any invalid responses.
243243
244244
Args:
245-
llm: The LLM instance to call
246-
messages: The messages to send to the LLM
247-
callbacks: List of callbacks for the LLM call
248-
printer: Printer instance for output
249-
from_task: Optional task context for the LLM call
250-
from_agent: Optional agent context for the LLM call
251-
response_model: Optional Pydantic model for structured outputs
252-
executor_context: Optional executor context for hook invocation
245+
llm: The LLM instance to call.
246+
messages: The messages to send to the LLM.
247+
callbacks: List of callbacks for the LLM call.
248+
printer: Printer instance for output.
249+
from_task: Optional task context for the LLM call.
250+
from_agent: Optional agent context for the LLM call.
251+
response_model: Optional Pydantic model for structured outputs.
252+
executor_context: Optional executor context for hook invocation.
253253
254254
Returns:
255-
The response from the LLM as a string
255+
The response from the LLM as a string.
256256
257257
Raises:
258258
Exception: If an error occurs.
@@ -284,6 +284,60 @@ def get_llm_response(
284284
return _setup_after_llm_call_hooks(executor_context, answer, printer)
285285

286286

287+
async def aget_llm_response(
288+
llm: LLM | BaseLLM,
289+
messages: list[LLMMessage],
290+
callbacks: list[TokenCalcHandler],
291+
printer: Printer,
292+
from_task: Task | None = None,
293+
from_agent: Agent | LiteAgent | None = None,
294+
response_model: type[BaseModel] | None = None,
295+
executor_context: CrewAgentExecutor | None = None,
296+
) -> str:
297+
"""Call the LLM asynchronously and return the response.
298+
299+
Args:
300+
llm: The LLM instance to call.
301+
messages: The messages to send to the LLM.
302+
callbacks: List of callbacks for the LLM call.
303+
printer: Printer instance for output.
304+
from_task: Optional task context for the LLM call.
305+
from_agent: Optional agent context for the LLM call.
306+
response_model: Optional Pydantic model for structured outputs.
307+
executor_context: Optional executor context for hook invocation.
308+
309+
Returns:
310+
The response from the LLM as a string.
311+
312+
Raises:
313+
Exception: If an error occurs.
314+
ValueError: If the response is None or empty.
315+
"""
316+
if executor_context is not None:
317+
if not _setup_before_llm_call_hooks(executor_context, printer):
318+
raise ValueError("LLM call blocked by before_llm_call hook")
319+
messages = executor_context.messages
320+
321+
try:
322+
answer = await llm.acall(
323+
messages,
324+
callbacks=callbacks,
325+
from_task=from_task,
326+
from_agent=from_agent, # type: ignore[arg-type]
327+
response_model=response_model,
328+
)
329+
except Exception as e:
330+
raise e
331+
if not answer:
332+
printer.print(
333+
content="Received None or empty response from LLM call.",
334+
color="red",
335+
)
336+
raise ValueError("Invalid response from LLM call - None or empty.")
337+
338+
return _setup_after_llm_call_hooks(executor_context, answer, printer)
339+
340+
287341
def process_llm_response(
288342
answer: str, use_stop_words: bool
289343
) -> AgentAction | AgentFinish:

0 commit comments

Comments
 (0)