Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ convention = "google"
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_default_fixture_loop_scope = "function"
markers = [
"delegation: marks tests that cover delegation-specific behaviors",
]


[tool.coverage.run]
Expand Down
278 changes: 277 additions & 1 deletion src/strands/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from ..types._events import AgentResultEvent, InitEventLoopEvent, ModelStreamChunkEvent, TypedEvent
from ..types.agent import AgentInput
from ..types.content import ContentBlock, Message, Messages
from ..types.exceptions import ContextWindowOverflowException
from ..types.exceptions import AgentDelegationException, ContextWindowOverflowException
from ..types.tools import ToolResult, ToolUse
from ..types.traces import AttributeValue
from .agent_result import AgentResult
Expand Down Expand Up @@ -225,6 +225,13 @@ def __init__(
hooks: Optional[list[HookProvider]] = None,
session_manager: Optional[SessionManager] = None,
tool_executor: Optional[ToolExecutor] = None,
sub_agents: Optional[list["Agent"]] = None,
delegation_timeout: Optional[float] = 300.0,
delegation_state_transfer: bool = True,
delegation_message_transfer: bool = True,
delegation_state_serializer: Optional[Callable[[Any], Any]] = None,
max_delegation_depth: int = 10,
delegation_streaming_proxy: bool = True,
):
"""Initialize the Agent with the specified configuration.

Expand Down Expand Up @@ -268,6 +275,25 @@ def __init__(
session_manager: Manager for handling agent sessions including conversation history and state.
If provided, enables session-based persistence and state management.
tool_executor: Definition of tool execution stragety (e.g., sequential, concurrent, etc.).
sub_agents: List of sub-agents available for delegation.
Each sub-agent will have a corresponding handoff_to_{name} tool
auto-generated for complete delegation.
delegation_timeout: Timeout in seconds for delegation operations.
Defaults to 300 seconds (5 minutes). Set to None for no timeout.
delegation_state_transfer: Whether to transfer agent.state to sub-agents.
Defaults to True. When True, sub-agents receive a deep copy of the
orchestrator's state. When False, sub-agents use their own state.
delegation_message_transfer: Whether to transfer conversation history.
Defaults to True. Controls whether messages are copied to sub-agent.
max_delegation_depth: Maximum allowed depth for nested delegation.
Prevents infinite delegation chains. Defaults to 10.
delegation_state_serializer: Optional custom serializer for state transfer.
When provided, this callable will be used to serialize state instead of
deepcopy. Useful for large or complex states where deepcopy is inefficient.
Should return a serialized copy of the state.
delegation_streaming_proxy: Whether to proxy streaming events from sub-agents.
Defaults to True. When True, streaming events from sub-agents are
proxied back to the original caller for real-time visibility.

Raises:
ValueError: If agent id contains path separators.
Expand Down Expand Up @@ -349,6 +375,22 @@ def __init__(
self.hooks.add_hook(hook)
self.hooks.invoke_callbacks(AgentInitializedEvent(agent=self))

# Initialization of the sub-agents and delegation configuration

self._sub_agents: dict[str, "Agent"] = {}
self.delegation_timeout = delegation_timeout
self.delegation_state_transfer = delegation_state_transfer
self.delegation_message_transfer = delegation_message_transfer
self.delegation_state_serializer = delegation_state_serializer
self.max_delegation_depth = max_delegation_depth
self.delegation_streaming_proxy = delegation_streaming_proxy

if sub_agents:
self._validate_sub_agents(sub_agents)
for sub_agent in sub_agents:
self._sub_agents[sub_agent.name] = sub_agent
self._generate_delegation_tools(list(self._sub_agents.values()))

@property
def tool(self) -> ToolCaller:
"""Call tool as a function.
Expand Down Expand Up @@ -826,3 +868,237 @@ def _append_message(self, message: Message) -> None:
"""Appends a message to the agent's list of messages and invokes the callbacks for the MessageCreatedEvent."""
self.messages.append(message)
self.hooks.invoke_callbacks(MessageAddedEvent(agent=self, message=message))

@property
def sub_agents(self) -> dict[str, "Agent"]:
"""Get a copy of the registered sub-agents.

Returns:
Dictionary mapping agent names to Agent instances
"""
return self._sub_agents.copy()

def add_sub_agent(self, agent: "Agent") -> None:
"""Add a new sub-agent dynamically.

Args:
agent: Agent to add as a sub-agent

Raises:
ValueError: If agent validation fails
"""
self._validate_sub_agents([agent])
if agent.name not in self._sub_agents:
self._sub_agents[agent.name] = agent
self._generate_delegation_tools([agent])

# Invoke hook for consistency with agent lifecycle
if hasattr(self, "hooks"):
try:
from ..hooks import SubAgentAddedEvent

self.hooks.invoke_callbacks(
SubAgentAddedEvent(agent=self, sub_agent=agent, sub_agent_name=agent.name)
)
except ImportError:
# Hooks module not available, skip hook invocation
pass

def remove_sub_agent(self, agent_name: str) -> bool:
"""Remove a sub-agent and its delegation tool.

Args:
agent_name: Name of the sub-agent to remove

Returns:
True if agent was removed, False if not found
"""
if agent_name in self._sub_agents:
removed_agent = self._sub_agents[agent_name]
del self._sub_agents[agent_name]

# Remove delegation tool from registry
tool_name = f"handoff_to_{agent_name.lower().replace('-', '_')}"
if tool_name in self.tool_registry.registry:
del self.tool_registry.registry[tool_name]

# Invoke hook for cleanup
if hasattr(self, "hooks"):
try:
from ..hooks import SubAgentRemovedEvent

self.hooks.invoke_callbacks(
SubAgentRemovedEvent(agent=self, sub_agent_name=agent_name, removed_agent=removed_agent)
)
except ImportError:
# Hooks module not available, skip hook invocation
pass

return True
return False

def _validate_sub_agents(self, sub_agents: Optional[list["Agent"]]) -> None:
"""Validate sub-agent configuration.

Args:
sub_agents: List of sub-agents to validate

Raises:
ValueError: If sub-agent configuration is invalid
"""
if not sub_agents:
return

# Check for unique names
names = [agent.name for agent in sub_agents]
if len(names) != len(set(names)):
raise ValueError("Sub-agent names must be unique")

# Check for circular references
if self in sub_agents:
raise ValueError("Agent cannot delegate to itself")

# Check for duplicate names with existing tools
existing_tools = self.tool_names
for agent in sub_agents:
tool_name = f"handoff_to_{agent.name.lower().replace('-', '_')}"
if tool_name in existing_tools:
raise ValueError(f"Tool name conflict: {tool_name} already exists")

# Check for model compatibility if applicable
if hasattr(self, "model") and hasattr(self.model, "config"):
orchestrator_provider = self.model.config.get("provider")
if orchestrator_provider:
for agent in sub_agents:
if hasattr(agent, "model") and hasattr(agent.model, "config"):
sub_agent_provider = agent.model.config.get("provider")
if sub_agent_provider and sub_agent_provider != orchestrator_provider:
# Just a warning, not an error, as cross-provider delegation may be intentional
logger.warning(
"Model provider mismatch: %s uses %s, but sub-agent %s uses %s",
self.name,
orchestrator_provider,
agent.name,
sub_agent_provider,
)

def _generate_delegation_tools(self, sub_agents: list["Agent"]) -> None:
"""Generate delegation tools for sub-agents.

Args:
sub_agents: List of sub-agents to generate tools for
"""
from strands.tools import tool

for sub_agent in sub_agents:
tool_name = f"handoff_to_{sub_agent.name.lower().replace('-', '_')}"

# Create closure configuration to avoid memory leak from capturing self
delegation_config = {
"orchestrator_name": self.name,
"max_delegation_depth": getattr(self, "max_delegation_depth", None),
"delegation_state_transfer": self.delegation_state_transfer,
"delegation_message_transfer": self.delegation_message_transfer,
}

@tool(name=tool_name)
def delegation_tool(
message: str,
context: dict[str, Any] | None = None,
transfer_state: bool | None = None,
transfer_messages: bool | None = None,
target_agent: str = sub_agent.name,
delegation_chain: list[str] | None = None,
delegation_config: dict[str, Any] = delegation_config,
) -> dict[str, Any]:
"""Transfer control completely to specified sub-agent.

This tool completely delegates the current request to the target agent.
The orchestrator will terminate and the sub-agent's response will become
the final response with no additional processing.

Args:
message: Message to pass to the target agent
context: Additional context to transfer (optional)
transfer_state: Override the default state transfer behavior (optional)
transfer_messages: Override the default message transfer behavior (optional)
target_agent: Internal target agent identifier
delegation_chain: Internal delegation tracking
delegation_config: Delegation configuration (internal)

Returns:
This tool raises AgentDelegationException and does not return normally.
"""
current_depth = len(delegation_chain or [])
max_depth = delegation_config["max_delegation_depth"]
if max_depth and current_depth >= max_depth:
raise ValueError(f"Maximum delegation depth ({delegation_config['max_delegation_depth']}) exceeded")

orchestrator_name = delegation_config["orchestrator_name"]
state_transfer_default = delegation_config["delegation_state_transfer"]

raise AgentDelegationException(
target_agent=target_agent,
message=message,
context=context or {},
delegation_chain=(delegation_chain or []) + [orchestrator_name],
transfer_state=transfer_state if transfer_state is not None else state_transfer_default,
transfer_messages=transfer_messages
if transfer_messages is not None
else delegation_config["delegation_message_transfer"],
)

agent_description = sub_agent.description or f"Specialized agent named {sub_agent.name}"
capabilities_hint = ""
if hasattr(sub_agent, "tools") and sub_agent.tools:
tool_names = [
getattr(tool, "tool_name", getattr(tool, "__name__", str(tool))) for tool in sub_agent.tools[:3]
] # Show first 3 tools as hint
if tool_names:
capabilities_hint = f" Capabilities include: {', '.join(tool_names)}."

# Concise tool docstring to avoid prompt bloat
delegation_tool.__doc__ = (
f"Delegate to {sub_agent.name} ({agent_description}).{capabilities_hint}\n"
f"Transfers control completely - orchestrator terminates and "
f"{sub_agent.name}'s response becomes final.\n\n"
f"Use for: {agent_description.lower()}.\n"
f"Args:\n"
f" message: Message for {sub_agent.name} (required)\n"
f" context: Additional context (optional)\n"
f" transfer_state: Transfer orchestrator.state (optional)\n"
f" transfer_messages: Transfer conversation history (optional)\n"
f" target_agent: Internal identifier (hidden)\n"
f" delegation_chain: Delegation tracking (hidden)\n"
f" delegation_config: Delegation configuration (internal)"
)

# Set JSON schema for better validation and model understanding
# DecoratedFunctionTool doesn't have __schema__ by default, but Python allows
# setting arbitrary attributes dynamically
delegation_tool.__schema__ = {
"type": "object",
"properties": {
"message": {"type": "string", "description": f"Message to pass to {sub_agent.name}"},
"context": {"type": ["object", "null"], "description": "Additional context to transfer"},
"transfer_state": {
"type": ["boolean", "null"],
"description": "Whether to transfer orchestrator.state",
},
"transfer_messages": {
"type": ["boolean", "null"],
"description": "Whether to transfer conversation history",
},
"target_agent": {"type": "string", "description": "Internal target agent identifier"},
"delegation_chain": {
"type": "array",
"items": {"type": "string"},
"description": "Internal delegation tracking",
},
},
"required": ["message"],
"additionalProperties": False,
}

# Register the tool
self.tool_registry.register_tool(delegation_tool)
Loading