Skip to content

Commit b9dd166

Browse files
Lorenze/agent executor flow pattern (#3975)
* WIP gh pr refactor: update agent executor handling and introduce flow-based executor * wip * refactor: clean up comments and improve code clarity in agent executor flow - Removed outdated comments and unnecessary explanations in and classes to enhance code readability. - Simplified parameter updates in the agent executor to avoid confusion regarding executor recreation. - Improved clarity in the method to ensure proper handling of non-final answers without raising errors. * bumping pytest-randomly numpy * also bump versions of anthropic sdk * ensure flow logs are not passed if its on executor * revert anthropic bump * fix * refactor: update dependency markers in uv.lock for platform compatibility - Enhanced dependency markers for , , , and others to ensure compatibility across different platforms (Linux, Darwin, and architecture-specific conditions). - Removed unnecessary event emission in the class during kickoff. - Cleaned up commented-out code in the class for better readability and maintainability. * drop dupllicate * test: enhance agent executor creation and stop word assertions - Added calls to create_agent_executor in multiple test cases to ensure proper agent execution setup. - Updated assertions for stop words in the agent tests to remove unnecessary checks and improve clarity. - Ensured consistency in task handling by invoking create_agent_executor with the appropriate task parameter. * refactor: reorganize agent executor imports and introduce CrewAgentExecutorFlow - Removed the old import of CrewAgentExecutorFlow and replaced it with the new import from the experimental module. - Updated relevant references in the codebase to ensure compatibility with the new structure. - Enhanced the organization of imports in core.py and base_agent.py for better clarity and maintainability. * updating name * dropped usage of printer here for rich console and dropped non-added value logging * address i18n * Enhance concurrency control in CrewAgentExecutorFlow by introducing a threading lock to prevent concurrent executions. This change ensures that the executor instance cannot be invoked while already running, improving stability and reliability during flow execution. * string literal returns * string literal returns * Enhance CrewAgentExecutor initialization by allowing optional i18n parameter for improved internationalization support. This change ensures that the executor can utilize a provided i18n instance or fallback to the default, enhancing flexibility in multilingual contexts. --------- Co-authored-by: Greyson LaLonde <[email protected]>
1 parent c73b36a commit b9dd166

File tree

16 files changed

+2024
-609
lines changed

16 files changed

+2024
-609
lines changed

lib/crewai/src/crewai/agent/core.py

Lines changed: 82 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4-
from collections.abc import Sequence
4+
from collections.abc import Callable, Sequence
55
import shutil
66
import subprocess
77
import time
@@ -44,6 +44,7 @@
4444
MemoryRetrievalCompletedEvent,
4545
MemoryRetrievalStartedEvent,
4646
)
47+
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
4748
from crewai.knowledge.knowledge import Knowledge
4849
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
4950
from crewai.lite_agent import LiteAgent
@@ -105,7 +106,7 @@ class Agent(BaseAgent):
105106
The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents.
106107
107108
Attributes:
108-
agent_executor: An instance of the CrewAgentExecutor class.
109+
agent_executor: An instance of the CrewAgentExecutor or CrewAgentExecutorFlow class.
109110
role: The role of the agent.
110111
goal: The objective of the agent.
111112
backstory: The backstory of the agent.
@@ -221,6 +222,10 @@ class Agent(BaseAgent):
221222
default=None,
222223
description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. Can be a single A2AConfig or a dict mapping agent IDs to configs.",
223224
)
225+
executor_class: type[CrewAgentExecutor] | type[CrewAgentExecutorFlow] = Field(
226+
default=CrewAgentExecutor,
227+
description="Class to use for the agent executor. Defaults to CrewAgentExecutor, can optionally use CrewAgentExecutorFlow.",
228+
)
224229

225230
@model_validator(mode="before")
226231
def validate_from_repository(cls, v: Any) -> dict[str, Any] | None | Any: # noqa: N805
@@ -721,29 +726,83 @@ def create_agent_executor(
721726
self.response_template.split("{{ .Response }}")[1].strip()
722727
)
723728

724-
self.agent_executor = CrewAgentExecutor(
725-
llm=self.llm, # type: ignore[arg-type]
726-
task=task, # type: ignore[arg-type]
727-
agent=self,
728-
crew=self.crew,
729-
tools=parsed_tools,
730-
prompt=prompt,
731-
original_tools=raw_tools,
732-
stop_words=stop_words,
733-
max_iter=self.max_iter,
734-
tools_handler=self.tools_handler,
735-
tools_names=get_tool_names(parsed_tools),
736-
tools_description=render_text_description_and_args(parsed_tools),
737-
step_callback=self.step_callback,
738-
function_calling_llm=self.function_calling_llm,
739-
respect_context_window=self.respect_context_window,
740-
request_within_rpm_limit=(
741-
self._rpm_controller.check_or_wait if self._rpm_controller else None
742-
),
743-
callbacks=[TokenCalcHandler(self._token_process)],
744-
response_model=task.response_model if task else None,
729+
rpm_limit_fn = (
730+
self._rpm_controller.check_or_wait if self._rpm_controller else None
745731
)
746732

733+
if self.agent_executor is not None:
734+
self._update_executor_parameters(
735+
task=task,
736+
tools=parsed_tools,
737+
raw_tools=raw_tools,
738+
prompt=prompt,
739+
stop_words=stop_words,
740+
rpm_limit_fn=rpm_limit_fn,
741+
)
742+
else:
743+
self.agent_executor = self.executor_class(
744+
llm=cast(BaseLLM, self.llm),
745+
task=task,
746+
i18n=self.i18n,
747+
agent=self,
748+
crew=self.crew,
749+
tools=parsed_tools,
750+
prompt=prompt,
751+
original_tools=raw_tools,
752+
stop_words=stop_words,
753+
max_iter=self.max_iter,
754+
tools_handler=self.tools_handler,
755+
tools_names=get_tool_names(parsed_tools),
756+
tools_description=render_text_description_and_args(parsed_tools),
757+
step_callback=self.step_callback,
758+
function_calling_llm=self.function_calling_llm,
759+
respect_context_window=self.respect_context_window,
760+
request_within_rpm_limit=rpm_limit_fn,
761+
callbacks=[TokenCalcHandler(self._token_process)],
762+
response_model=task.response_model if task else None,
763+
)
764+
765+
def _update_executor_parameters(
766+
self,
767+
task: Task | None,
768+
tools: list,
769+
raw_tools: list[BaseTool],
770+
prompt: dict,
771+
stop_words: list[str],
772+
rpm_limit_fn: Callable | None,
773+
) -> None:
774+
"""Update executor parameters without recreating instance.
775+
776+
Args:
777+
task: Task to execute.
778+
tools: Parsed tools.
779+
raw_tools: Original tools.
780+
prompt: Generated prompt.
781+
stop_words: Stop words list.
782+
rpm_limit_fn: RPM limit callback function.
783+
"""
784+
self.agent_executor.task = task
785+
self.agent_executor.tools = tools
786+
self.agent_executor.original_tools = raw_tools
787+
self.agent_executor.prompt = prompt
788+
self.agent_executor.stop = stop_words
789+
self.agent_executor.tools_names = get_tool_names(tools)
790+
self.agent_executor.tools_description = render_text_description_and_args(tools)
791+
self.agent_executor.response_model = task.response_model if task else None
792+
793+
self.agent_executor.tools_handler = self.tools_handler
794+
self.agent_executor.request_within_rpm_limit = rpm_limit_fn
795+
796+
if self.agent_executor.llm:
797+
existing_stop = getattr(self.agent_executor.llm, "stop", [])
798+
self.agent_executor.llm.stop = list(
799+
set(
800+
existing_stop + stop_words
801+
if isinstance(existing_stop, list)
802+
else stop_words
803+
)
804+
)
805+
747806
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
748807
agent_tools = AgentTools(agents=agents)
749808
return agent_tools.tools()

lib/crewai/src/crewai/agents/agent_builder/base_agent.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,6 @@ def set_cache_handler(self, cache_handler: CacheHandler) -> None:
457457
if self.cache:
458458
self.cache_handler = cache_handler
459459
self.tools_handler.cache = cache_handler
460-
self.create_agent_executor()
461460

462461
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
463462
"""Set the rpm controller for the agent.
@@ -467,7 +466,6 @@ def set_rpm_controller(self, rpm_controller: RPMController) -> None:
467466
"""
468467
if not self._rpm_controller:
469468
self._rpm_controller = rpm_controller
470-
self.create_agent_executor()
471469

472470
def set_knowledge(self, crew_embedder: EmbedderConfig | None = None) -> None:
473471
pass

lib/crewai/src/crewai/agents/crew_agent_executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def __init__(
9191
request_within_rpm_limit: Callable[[], bool] | None = None,
9292
callbacks: list[Any] | None = None,
9393
response_model: type[BaseModel] | None = None,
94+
i18n: I18N | None = None,
9495
) -> None:
9596
"""Initialize executor.
9697
@@ -114,7 +115,7 @@ def __init__(
114115
callbacks: Optional callbacks list.
115116
response_model: Optional Pydantic model for structured outputs.
116117
"""
117-
self._i18n: I18N = get_i18n()
118+
self._i18n: I18N = i18n or get_i18n()
118119
self.llm = llm
119120
self.task = task
120121
self.agent = agent

lib/crewai/src/crewai/experimental/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
12
from crewai.experimental.evaluation import (
23
AgentEvaluationResult,
34
AgentEvaluator,
@@ -23,6 +24,7 @@
2324
"AgentEvaluationResult",
2425
"AgentEvaluator",
2526
"BaseEvaluator",
27+
"CrewAgentExecutorFlow",
2628
"EvaluationScore",
2729
"EvaluationTraceCallback",
2830
"ExperimentResult",

0 commit comments

Comments
 (0)