diff --git a/README.md b/README.md
index 60a102a5..88d273fe 100644
--- a/README.md
+++ b/README.md
@@ -1,415 +1,137 @@
-
-
-
-
-
-
-
-
+# Python Web Application
-## Overview
+## Project Description
-> [!TIP]
-> Documentation site is in production here : https://fast-agent.ai. Feel free to feed back what's helpful and what's not. There is also an LLMs.txt [here](https://fast-agent.ai/llms.txt)
+This is a comprehensive Python web application that [brief description of the project's main purpose and key features]. The application is designed to [explain the primary goal, target users, and main functionality].
-**`fast-agent`** enables you to create and interact with sophisticated Agents and Workflows in minutes. It is the first framework with complete, end-to-end tested MCP Feature support including Sampling. Model support is comprehensive with native support for Anthropic, OpenAI and Google as well as Azure, Ollama, Deepseek and dozens of others via TensorZero.
+Key features include:
+- Feature 1: Description of the first major feature
+- Feature 2: Description of the second major feature
+- Feature 3: Description of the third major feature
-
+## Setup Instructions
-The simple declarative syntax lets you concentrate on composing your Prompts and MCP Servers to [build effective agents](https://www.anthropic.com/research/building-effective-agents).
+### Prerequisites
-`fast-agent` is multi-modal, supporting Images and PDFs for both Anthropic and OpenAI endpoints via Prompts, Resources and MCP Tool Call results. The inclusion of passthrough and playback LLMs enable rapid development and test of Python glue-code for your applications.
+Before you begin, ensure you have the following installed:
+- Python 3.8 or higher
+- pip (Python package manager)
+- Virtual environment tool (venv recommended)
-> [!IMPORTANT]
->
-> `fast-agent` The fast-agent documentation repo is here: https://github.com/evalstate/fast-agent-docs. Please feel free to submit PRs for documentation, experience reports or other content you think others may find helpful. All help and feedback warmly received.
-
-### Agent Application Development
-
-Prompts and configurations that define your Agent Applications are stored in simple files, with minimal boilerplate, enabling simple management and version control.
-
-Chat with individual Agents and Components before, during and after workflow execution to tune and diagnose your application. Agents can request human input to get additional context for task completion.
-
-Simple model selection makes testing Model <-> MCP Server interaction painless. You can read more about the motivation behind this project [here](https://llmindset.co.uk/resources/fast-agent/)
-
-
-
-## Get started:
-
-Start by installing the [uv package manager](https://docs.astral.sh/uv/) for Python. Then:
+### Installation Steps
+1. Clone the repository:
```bash
-uv pip install fast-agent-mcp # install fast-agent!
-fast-agent go # start an interactive session
-fast-agent go https://hf.co/mcp # with a remote MCP
-fast-agent go --model=generic.qwen2.5 # use ollama qwen 2.5
-fast-agent setup # create an example agent and config files
-uv run agent.py # run your first agent
-uv run agent.py --model=o3-mini.low # specify a model
-fast-agent quickstart workflow # create "building effective agents" examples
+git clone https://github.com/yourusername/your-repo-name.git
+cd your-repo-name
```
-Other quickstart examples include a Researcher Agent (with Evaluator-Optimizer workflow) and Data Analysis Agent (similar to the ChatGPT experience), demonstrating MCP Roots support.
-
-> [!TIP]
-> Windows Users - there are a couple of configuration changes needed for the Filesystem and Docker MCP Servers - necessary changes are detailed within the configuration files.
-
-### Basic Agents
-
-Defining an agent is as simple as:
-
-```python
-@fast.agent(
- instruction="Given an object, respond only with an estimate of its size."
-)
-```
-
-We can then send messages to the Agent:
-
-```python
-async with fast.run() as agent:
- moon_size = await agent("the moon")
- print(moon_size)
-```
-
-Or start an interactive chat with the Agent:
-
-```python
-async with fast.run() as agent:
- await agent.interactive()
-```
-
-Here is the complete `sizer.py` Agent application, with boilerplate code:
-
-```python
-import asyncio
-from mcp_agent.core.fastagent import FastAgent
-
-# Create the application
-fast = FastAgent("Agent Example")
-
-@fast.agent(
- instruction="Given an object, respond only with an estimate of its size."
-)
-async def main():
- async with fast.run() as agent:
- await agent.interactive()
-
-if __name__ == "__main__":
- asyncio.run(main())
-```
-
-The Agent can then be run with `uv run sizer.py`.
-
-Specify a model with the `--model` switch - for example `uv run sizer.py --model sonnet`.
-
-### Combining Agents and using MCP Servers
-
-_To generate examples use `fast-agent quickstart workflow`. This example can be run with `uv run workflow/chaining.py`. fast-agent looks for configuration files in the current directory before checking parent directories recursively._
-
-Agents can be chained to build a workflow, using MCP Servers defined in the `fastagent.config.yaml` file:
-
-```python
-@fast.agent(
- "url_fetcher",
- "Given a URL, provide a complete and comprehensive summary",
- servers=["fetch"], # Name of an MCP Server defined in fastagent.config.yaml
-)
-@fast.agent(
- "social_media",
- """
- Write a 280 character social media post for any given text.
- Respond only with the post, never use hashtags.
- """,
-)
-@fast.chain(
- name="post_writer",
- sequence=["url_fetcher", "social_media"],
-)
-async def main():
- async with fast.run() as agent:
- # using chain workflow
- await agent.post_writer("http://llmindset.co.uk")
-```
-
-All Agents and Workflows respond to `.send("message")` or `.prompt()` to begin a chat session.
-
-Saved as `social.py` we can now run this workflow from the command line with:
-
+2. Create a virtual environment:
```bash
-uv run workflow/chaining.py --agent post_writer --message ""
-```
-
-Add the `--quiet` switch to disable progress and message display and return only the final response - useful for simple automations.
-
-## Workflows
-
-### Chain
-
-The `chain` workflow offers a more declarative approach to calling Agents in sequence:
-
-```python
-
-@fast.chain(
- "post_writer",
- sequence=["url_fetcher","social_media"]
-)
-
-# we can them prompt it directly:
-async with fast.run() as agent:
- await agent.post_writer()
-
+python3 -m venv venv
```
-This starts an interactive session, which produces a short social media post for a given URL. If a _chain_ is prompted it returns to a chat with last Agent in the chain. You can switch the agent to prompt by typing `@agent-name`.
-
-Chains can be incorporated in other workflows, or contain other workflow elements (including other Chains). You can set an `instruction` to precisely describe it's capabilities to other workflow steps if needed.
-
-### Human Input
-
-Agents can request Human Input to assist with a task or get additional context:
-
-```python
-@fast.agent(
- instruction="An AI agent that assists with basic tasks. Request Human Input when needed.",
- human_input=True,
-)
-
-await agent("print the next number in the sequence")
-```
-
-In the example `human_input.py`, the Agent will prompt the User for additional information to complete the task.
-
-### Parallel
-
-The Parallel Workflow sends the same message to multiple Agents simultaneously (`fan-out`), then uses the `fan-in` Agent to process the combined content.
-
-```python
-@fast.agent("translate_fr", "Translate the text to French")
-@fast.agent("translate_de", "Translate the text to German")
-@fast.agent("translate_es", "Translate the text to Spanish")
-
-@fast.parallel(
- name="translate",
- fan_out=["translate_fr","translate_de","translate_es"]
-)
-
-@fast.chain(
- "post_writer",
- sequence=["url_fetcher","social_media","translate"]
-)
-```
-
-If you don't specify a `fan-in` agent, the `parallel` returns the combined Agent results verbatim.
-
-`parallel` is also useful to ensemble ideas from different LLMs.
-
-When using `parallel` in other workflows, specify an `instruction` to describe its operation.
-
-### Evaluator-Optimizer
-
-Evaluator-Optimizers combine 2 agents: one to generate content (the `generator`), and the other to judge that content and provide actionable feedback (the `evaluator`). Messages are sent to the generator first, then the pair run in a loop until either the evaluator is satisfied with the quality, or the maximum number of refinements is reached. The final result from the Generator is returned.
-
-If the Generator has `use_history` off, the previous iteration is returned when asking for improvements - otherwise conversational context is used.
-
-```python
-@fast.evaluator_optimizer(
- name="researcher",
- generator="web_searcher",
- evaluator="quality_assurance",
- min_rating="EXCELLENT",
- max_refinements=3
-)
-
-async with fast.run() as agent:
- await agent.researcher.send("produce a report on how to make the perfect espresso")
-```
-
-When used in a workflow, it returns the last `generator` message as the result.
-
-See the `evaluator.py` workflow example, or `fast-agent quickstart researcher` for a more complete example.
-
-### Router
-
-Routers use an LLM to assess a message, and route it to the most appropriate Agent. The routing prompt is automatically generated based on the Agent instructions and available Servers.
-
-```python
-@fast.router(
- name="route",
- agents=["agent1","agent2","agent3"]
-)
+3. Activate the virtual environment:
+- On Windows:
+```bash
+venv\Scripts\activate
```
-
-Look at the `router.py` workflow for an example.
-
-### Orchestrator
-
-Given a complex task, the Orchestrator uses an LLM to generate a plan to divide the task amongst the available Agents. The planning and aggregation prompts are generated by the Orchestrator, which benefits from using more capable models. Plans can either be built once at the beginning (`plantype="full"`) or iteratively (`plantype="iterative"`).
-
-```python
-@fast.orchestrator(
- name="orchestrate",
- agents=["task1","task2","task3"]
-)
+- On macOS and Linux:
+```bash
+source venv/bin/activate
```
-See the `orchestrator.py` or `agent_build.py` workflow example.
-
-## Agent Features
-
-### Calling Agents
-
-All definitions allow omitting the name and instructions arguments for brevity:
-
-```python
-@fast.agent("You are a helpful agent") # Create an agent with a default name.
-@fast.agent("greeter","Respond cheerfully!") # Create an agent with the name "greeter"
-
-moon_size = await agent("the moon") # Call the default (first defined agent) with a message
-
-result = await agent.greeter("Good morning!") # Send a message to an agent by name using dot notation
-result = await agent.greeter.send("Hello!") # You can call 'send' explicitly
-
-await agent.greeter() # If no message is specified, a chat session will open
-await agent.greeter.prompt() # that can be made more explicit
-await agent.greeter.prompt(default_prompt="OK") # and supports setting a default prompt
-
-agent["greeter"].send("Good Evening!") # Dictionary access is supported if preferred
+4. Install dependencies:
+```bash
+pip install -r requirements.txt
```
-### Defining Agents
+5. Set up environment variables:
+- Create a `.env` file in the project root
+- Add necessary configuration variables (example in `.env.example`)
-#### Basic Agent
+## Project Structure
-```python
-@fast.agent(
- name="agent", # name of the agent
- instruction="You are a helpful Agent", # base instruction for the agent
- servers=["filesystem"], # list of MCP Servers for the agent
- model="o3-mini.high", # specify a model for the agent
- use_history=True, # agent maintains chat history
- request_params=RequestParams(temperature= 0.7), # additional parameters for the LLM (or RequestParams())
- human_input=True, # agent can request human input
-)
```
-
-#### Chain
-
-```python
-@fast.chain(
- name="chain", # name of the chain
- sequence=["agent1", "agent2", ...], # list of agents in execution order
- instruction="instruction", # instruction to describe the chain for other workflows
- cumulative=False, # whether to accumulate messages through the chain
- continue_with_final=True, # open chat with agent at end of chain after prompting
-)
-```
-
-#### Parallel
-
-```python
-@fast.parallel(
- name="parallel", # name of the parallel workflow
- fan_out=["agent1", "agent2"], # list of agents to run in parallel
- fan_in="aggregator", # name of agent that combines results (optional)
- instruction="instruction", # instruction to describe the parallel for other workflows
- include_request=True, # include original request in fan-in message
-)
+your-project-name/
+│
+├── app/ # Main application package
+│ ├── __init__.py
+│ ├── main.py # Main application logic
+│ ├── models/ # Database models
+│ ├── routes/ # Route handlers
+│ └── templates/ # HTML templates
+│
+├── tests/ # Unit and integration tests
+│ ├── test_main.py
+│ └── test_models.py
+│
+├── static/ # Static files (CSS, JS, images)
+│ ├── css/
+│ └── js/
+│
+├── requirements.txt # Project dependencies
+├── README.md # Project documentation
+└── .env # Environment configuration
```
-#### Evaluator-Optimizer
-
-```python
-@fast.evaluator_optimizer(
- name="researcher", # name of the workflow
- generator="web_searcher", # name of the content generator agent
- evaluator="quality_assurance", # name of the evaluator agent
- min_rating="GOOD", # minimum acceptable quality (EXCELLENT, GOOD, FAIR, POOR)
- max_refinements=3, # maximum number of refinement iterations
-)
-```
+## How to Run
-#### Router
+### Development Server
-```python
-@fast.router(
- name="route", # name of the router
- agents=["agent1", "agent2", "agent3"], # list of agent names router can delegate to
- model="o3-mini.high", # specify routing model
- use_history=False, # router maintains conversation history
- human_input=False, # whether router can request human input
-)
+To run the application in development mode:
+```bash
+python app/main.py
```
-#### Orchestrator
+### Production Deployment
-```python
-@fast.orchestrator(
- name="orchestrator", # name of the orchestrator
- instruction="instruction", # base instruction for the orchestrator
- agents=["agent1", "agent2"], # list of agent names this orchestrator can use
- model="o3-mini.high", # specify orchestrator planning model
- use_history=False, # orchestrator doesn't maintain chat history (no effect).
- human_input=False, # whether orchestrator can request human input
- plan_type="full", # planning approach: "full" or "iterative"
- plan_iterations=5, # maximum number of full plan attempts, or iterations
-)
+For production, we recommend using a WSGI server like Gunicorn:
+```bash
+gunicorn -w 4 app.main:app
```
-### Multimodal Support
+### Running Tests
-Add Resources to prompts using either the inbuilt `prompt-server` or MCP Types directly. Convenience class are made available to do so simply, for example:
-
-```python
- summary: str = await agent.with_resource(
- "Summarise this PDF please",
- "mcp_server",
- "resource://fast-agent/sample.pdf",
- )
+Execute tests using pytest:
+```bash
+pytest tests/
```
-#### MCP Tool Result Conversion
-
-LLM APIs have restrictions on the content types that can be returned as Tool Calls/Function results via their Chat Completions API's:
-
-- OpenAI supports Text
-- Anthropic supports Text and Image
-
-For MCP Tool Results, `ImageResources` and `EmbeddedResources` are converted to User Messages and added to the conversation.
+## Contributing Guidelines
-### Prompts
-
-MCP Prompts are supported with `apply_prompt(name,arguments)`, which always returns an Assistant Message. If the last message from the MCP Server is a 'User' message, it is sent to the LLM for processing. Prompts applied to the Agent's Context are retained - meaning that with `use_history=False`, Agents can act as finely tuned responders.
-
-Prompts can also be applied interactively through the interactive interface by using the `/prompt` command.
-
-### Sampling
-
-Sampling LLMs are configured per Client/Server pair. Specify the model name in fastagent.config.yaml as follows:
-
-```yaml
-mcp:
- servers:
- sampling_resource:
- command: "uv"
- args: ["run", "sampling_resource_server.py"]
- sampling:
- model: "haiku"
-```
+We welcome contributions to this project! Here's how you can help:
-### Secrets File
+### Reporting Issues
+- Use GitHub Issues to report bugs
+- Provide a clear and detailed description
+- Include steps to reproduce the issue
+- Specify your environment (OS, Python version)
-> [!TIP]
-> fast-agent will look recursively for a fastagent.secrets.yaml file, so you only need to manage this at the root folder of your agent definitions.
+### Making Contributions
+1. Fork the repository
+2. Create a new branch (`git checkout -b feature/your-feature-name`)
+3. Make your changes
+4. Write or update tests as needed
+5. Ensure all tests pass
+6. Commit with a clear, descriptive commit message
+7. Push to your fork and submit a pull request
-### Interactive Shell
+### Code Style
+- Follow PEP 8 guidelines
+- Use type hints
+- Write docstrings for all functions and classes
+- Maintain consistent code formatting
-
+### Code of Conduct
+- Be respectful and inclusive
+- Provide constructive feedback
+- Collaborate and communicate openly
-## Project Notes
+## License
-`fast-agent` builds on the [`mcp-agent`](https://github.com/lastmile-ai/mcp-agent) project by Sarmad Qadri.
+[Specify your project's license, e.g., MIT, Apache 2.0]
-### Contributing
+## Contact
-Contributions and PRs are welcome - feel free to raise issues to discuss. Full guidelines for contributing and roadmap coming very soon. Get in touch!
+For questions or support, please contact [your email or preferred contact method].
\ No newline at end of file
diff --git a/examples/dynamic-agents/.gitignore b/examples/dynamic-agents/.gitignore
new file mode 100644
index 00000000..5c5c3137
--- /dev/null
+++ b/examples/dynamic-agents/.gitignore
@@ -0,0 +1,2 @@
+# Dynamic agents workspace directory
+workspace/
\ No newline at end of file
diff --git a/examples/dynamic-agents/README.md b/examples/dynamic-agents/README.md
new file mode 100644
index 00000000..552d28e4
--- /dev/null
+++ b/examples/dynamic-agents/README.md
@@ -0,0 +1,175 @@
+# Dynamic Agents Examples
+
+This directory contains examples demonstrating the dynamic agent creation capability in FastAgent. Dynamic agents can be created at runtime based on task analysis, allowing for adaptive team composition.
+
+## Features
+
+- **Runtime Agent Creation**: Create specialized agents on-the-fly
+- **Parallel Execution**: Multiple agents can work simultaneously
+- **Lifecycle Management**: Create, use, and terminate agents as needed
+- **Tool Access**: Dynamic agents can use MCP servers and tools
+- **Tree Display**: Visual representation of agent hierarchy
+
+## Available Examples
+
+### 1. Project Manager (`project_manager.py`)
+Demonstrates a project manager that creates and coordinates development teams for software projects.
+
+```bash
+# Run the full demo
+python project_manager.py
+
+# Interactive mode
+python project_manager.py interactive
+```
+
+### 2. Simple Demo (`simple_demo.py`)
+Basic demonstration of dynamic agent concepts with easy-to-understand examples.
+
+```bash
+# Run all simple examples
+python simple_demo.py
+
+# Run just the basic example
+python simple_demo.py basic
+
+# Run just the delegation example
+python simple_demo.py delegation
+```
+
+### 3. Code Review Demo (`code_review_demo.py`)
+Shows specialized code review teams that analyze code from different perspectives.
+
+```bash
+# Run all review examples
+python code_review_demo.py
+
+# Run just security-focused review
+python code_review_demo.py security
+
+# Run comprehensive review
+python code_review_demo.py comprehensive
+```
+
+### 4. Interactive Demo (`interactive_demo.py`)
+Interactive playground for experimenting with dynamic agents.
+
+```bash
+# Interactive mode
+python interactive_demo.py
+
+# Guided scenarios
+python interactive_demo.py guided
+
+# Quick demonstration
+python interactive_demo.py quick
+```
+
+### 5. Original Example (`example.py`)
+The original comprehensive example with multiple scenarios in one file.
+
+```bash
+# Full demo
+python example.py
+
+# Simple example
+python example.py simple
+
+# Interactive mode
+python example.py interactive
+```
+
+## How It Works
+
+### 1. Enable Dynamic Agents
+```python
+@fast.agent(
+ name="project_manager",
+ dynamic_agents=True, # Enable dynamic agent creation
+ max_dynamic_agents=5, # Limit to 5 agents
+ servers=["filesystem", "fetch"] # MCP servers available to dynamic agents
+)
+```
+
+### 2. Create Dynamic Agents
+The agent uses tools to create specialists:
+```python
+# Creates a frontend developer agent
+dynamic_agent_create({
+ "name": "frontend_dev",
+ "instruction": "You are a React/TypeScript expert...",
+ "servers": ["filesystem"],
+ "tools": {"filesystem": ["read*", "write*"]}
+})
+```
+
+### 3. Delegate Tasks
+```python
+# Send task to specific agent
+dynamic_agent_send({
+ "agent_id": "frontend_dev_abc123",
+ "message": "Create the main App component"
+})
+
+# Broadcast to multiple agents (parallel execution)
+dynamic_agent_broadcast({
+ "message": "Review this code for issues",
+ "agent_ids": ["security_expert", "performance_expert"],
+ "parallel": true
+})
+```
+
+## Available Tools
+
+When `dynamic_agents=True`, the agent gets these tools:
+
+- **dynamic_agent_create**: Create new specialized agents
+- **dynamic_agent_send**: Send messages to specific agents
+- **dynamic_agent_broadcast**: Send messages to multiple agents in parallel
+- **dynamic_agent_list**: List all active dynamic agents
+- **dynamic_agent_terminate**: Clean up agents when done
+
+## Use Cases
+
+### 1. Development Teams
+- Frontend/Backend/Database specialists
+- Code reviewers with different focuses
+- DevOps and QA specialists
+
+### 2. Content Creation
+- Writers, editors, fact-checkers
+- Specialized content for different audiences
+
+### 3. Data Analysis
+- Data collectors, cleaners, analyzers
+- Visualization and reporting specialists
+
+### 4. Research Projects
+- Domain experts for different topics
+- Fact-checkers and synthesizers
+
+## Architecture
+
+Dynamic agents follow the same patterns as parallel agents:
+- **Same Process**: All run in the same Python process
+- **Shared Context**: Use the same MCP connections
+- **Separate LLM Contexts**: Each has its own conversation history
+- **Parallel Execution**: Use `asyncio.gather()` like ParallelAgent
+- **Tree Display**: Extend parallel agent display patterns
+
+## Configuration
+
+Dynamic agents can only use MCP servers defined in `fastagent.config.yaml`. They cannot create new MCP connections, but can be configured with:
+
+- **Different instruction/role**
+- **Subset of MCP servers**
+- **Filtered tools from those servers**
+- **Different models**
+- **Own conversation context**
+
+## Limitations
+
+- Maximum number of agents enforced
+- Can only use pre-configured MCP servers
+- Exist only during parent agent's lifetime
+- No persistence across sessions
\ No newline at end of file
diff --git a/examples/dynamic-agents/code_review_demo.py b/examples/dynamic-agents/code_review_demo.py
new file mode 100644
index 00000000..f865006c
--- /dev/null
+++ b/examples/dynamic-agents/code_review_demo.py
@@ -0,0 +1,155 @@
+"""
+Code Review Team Dynamic Agents Example
+
+This example demonstrates creating specialized code review teams that can analyze
+code from different perspectives simultaneously.
+"""
+
+import asyncio
+
+from mcp_agent.core.fastagent import FastAgent
+
+# Create the application
+fast = FastAgent("Code Review Team Demo")
+
+
+# Sample problematic code for review
+SAMPLE_CODE = '''
+import hashlib
+import sqlite3
+import os
+
+def authenticate_user(username, password):
+ conn = sqlite3.connect('users.db')
+ cursor = conn.cursor()
+ query = f"SELECT * FROM users WHERE username='{username}'"
+ cursor.execute(query)
+ user = cursor.fetchone()
+
+ if user and user[2] == password:
+ return True
+ return False
+
+def create_user(username, password):
+ conn = sqlite3.connect('users.db')
+ cursor = conn.cursor()
+ cursor.execute(f"INSERT INTO users VALUES ('{username}', '{password}')")
+ conn.commit()
+ conn.close()
+
+def process_large_dataset(data):
+ results = []
+ for item in data:
+ # Inefficient nested loops
+ for i in range(len(data)):
+ for j in range(len(data)):
+ if data[i] == data[j]:
+ results.append(item)
+ return results
+
+class UserManager:
+ def __init__(self):
+ self.users = []
+
+ def add_user(self, user):
+ self.users.append(user)
+
+ def find_user(self, username):
+ for user in self.users:
+ if user.username == username:
+ return user
+'''
+
+
+@fast.agent(
+ name="review_coordinator",
+ instruction="""You are a code review coordinator that creates specialized review teams.
+
+When given code to review, you should:
+1. Create different types of reviewers with specific expertise
+2. Have them analyze the code in parallel from their perspectives
+3. Consolidate their findings into a comprehensive report
+
+Types of reviewers you can create:
+- Security Reviewer: Focuses on vulnerabilities, injection attacks, authentication
+- Performance Reviewer: Looks for optimization opportunities, bottlenecks
+- Code Quality Reviewer: Examines maintainability, readability, best practices
+- Architecture Reviewer: Analyzes design patterns, structure, scalability
+
+Use dynamic agent tools to create and coordinate the review team.""",
+ servers=["filesystem"],
+ dynamic_agents=True,
+ max_dynamic_agents=6,
+ model="haiku"
+)
+async def main():
+ async with fast.run() as agent:
+ print("=== Code Review Team Demo ===\n")
+
+ await agent.review_coordinator(f"""
+ I have a Python codebase that needs a comprehensive review.
+ Create a specialized code review team with different focuses:
+ 1. Security reviewer for vulnerability assessment
+ 2. Performance reviewer for optimization opportunities
+ 3. Code quality reviewer for maintainability
+ 4. Architecture reviewer for design patterns
+
+ Then have them review this sample code in parallel and provide a consolidated report:
+
+ ```python
+ {SAMPLE_CODE}
+ ```
+
+ Each reviewer should focus on their specialty and provide specific recommendations.
+ """)
+
+
+@fast.agent(
+ name="security_focused_reviewer",
+ instruction="""You are a security-focused code reviewer that creates specialized
+ security analysis teams.
+
+Create agents that focus on different security aspects:
+- Input validation specialist
+- Authentication security expert
+- Database security analyst
+- General security vulnerability scanner""",
+ servers=["filesystem"],
+ dynamic_agents=True,
+ max_dynamic_agents=4,
+ model="haiku"
+)
+async def security_review_example():
+ async with fast.run() as agent:
+ print("\n=== Security-Focused Review Example ===\n")
+
+ await agent.security_focused_reviewer(f"""
+ Create a specialized security review team to analyze this code for vulnerabilities:
+
+ ```python
+ {SAMPLE_CODE}
+ ```
+
+ Create different security specialists and have them each focus on their area of expertise.
+ Provide a detailed security assessment with risk levels and remediation steps.
+ """)
+
+
+async def run_all_reviews():
+ """Run all code review examples."""
+ print("Running Code Review Examples...\n")
+
+ await main()
+ print("\n" + "="*60 + "\n")
+ await security_review_example()
+
+
+if __name__ == "__main__":
+ import sys
+
+ if len(sys.argv) > 1 and sys.argv[1] == "security":
+ asyncio.run(security_review_example())
+ elif len(sys.argv) > 1 and sys.argv[1] == "comprehensive":
+ asyncio.run(main())
+ else:
+ asyncio.run(run_all_reviews())
\ No newline at end of file
diff --git a/examples/dynamic-agents/example.py b/examples/dynamic-agents/example.py
new file mode 100644
index 00000000..f4fcca67
--- /dev/null
+++ b/examples/dynamic-agents/example.py
@@ -0,0 +1,162 @@
+"""
+Dynamic Agents Example
+
+This example demonstrates how to use dynamic agents that can be created at runtime
+based on task analysis. The project manager creates specialized teams on-the-fly.
+"""
+
+import asyncio
+
+from mcp_agent.core.fastagent import FastAgent
+
+# Create the application
+fast = FastAgent("Dynamic Agents Example")
+
+
+@fast.agent(
+ name="project_manager",
+ instruction="""You are a project manager that creates and manages specialized development teams.
+
+When given a project, you should:
+1. Analyze what specialists are needed
+2. Create appropriate dynamic agents with specific roles
+3. Delegate tasks to the specialists
+4. Coordinate their work to complete the project
+
+You have access to dynamic agent tools:
+- dynamic_agent_create: Create new specialized agents
+- dynamic_agent_send: Send tasks to specific agents
+- dynamic_agent_broadcast: Send tasks to multiple agents in parallel
+- dynamic_agent_list: See all your active agents
+- dynamic_agent_terminate: Clean up agents when done
+
+Available MCP servers for your specialists:
+- filesystem: For reading/writing files
+- fetch: For web requests and API calls
+
+Example specialist roles:
+- Frontend Developer (React/TypeScript expert)
+- Backend Developer (Python/FastAPI expert)
+- Database Designer (SQL/schema expert)
+- Security Reviewer (security best practices)
+- DevOps Engineer (deployment and infrastructure)
+- QA Tester (testing and quality assurance)
+""",
+ servers=["filesystem", "fetch"],
+ dynamic_agents=True,
+ max_dynamic_agents=5,
+ model="haiku",
+)
+async def main():
+ async with fast.run() as agent:
+ print("=== Dynamic Agents Demo ===\n")
+
+ # Example 1: Web Development Project
+ print("Example 1: Building a Todo App")
+ await agent.project_manager("""
+ I need to build a React todo application with the following requirements:
+ 1. Frontend: React with TypeScript, modern hooks, responsive design
+ 2. Backend: Python FastAPI with RESTful endpoints
+ 3. Database: PostgreSQL schema design
+ 4. Security: Authentication, input validation, CORS
+ 5. Testing: Unit tests and integration tests
+
+ Please create appropriate specialists and coordinate their work to:
+ - Design the application architecture
+ - Create the database schema
+ - Build the backend API
+ - Develop the React frontend
+ - Implement security measures
+ - Write comprehensive tests
+
+ Show me the team you create and how you delegate the work.
+ """)
+
+ print("\n" + "=" * 50 + "\n")
+
+ # Example 2: Code Review Project
+ print("Example 2: Code Review Team")
+ await agent.project_manager("""
+ I have a large Python codebase that needs a comprehensive review.
+ Create a specialized code review team with different focuses:
+ 1. Security reviewer for vulnerability assessment
+ 2. Performance reviewer for optimization opportunities
+ 3. Code quality reviewer for maintainability
+ 4. Architecture reviewer for design patterns
+
+ Then have them review this sample code in parallel and provide a consolidated report:
+
+ ```python
+ import hashlib
+ import sqlite3
+
+ def authenticate_user(username, password):
+ conn = sqlite3.connect('users.db')
+ cursor = conn.cursor()
+ query = f"SELECT * FROM users WHERE username='{username}'"
+ cursor.execute(query)
+ user = cursor.fetchone()
+
+ if user and user[2] == password:
+ return True
+ return False
+
+ def create_user(username, password):
+ conn = sqlite3.connect('users.db')
+ cursor = conn.cursor()
+ cursor.execute(f"INSERT INTO users VALUES ('{username}', '{password}')")
+ conn.commit()
+ conn.close()
+ ```
+ """)
+
+
+@fast.agent(
+ name="simple_creator",
+ instruction="""You are a simple agent that demonstrates basic dynamic agent creation.
+
+You can create other agents and delegate simple tasks to them.
+Show how to create, use, and manage dynamic agents step by step.""",
+ servers=["filesystem"],
+ dynamic_agents=True,
+ max_dynamic_agents=3,
+)
+async def simple_example():
+ async with fast.run() as agent:
+ print("\n=== Simple Dynamic Agent Example ===\n")
+
+ await agent.simple_creator("""
+ Please demonstrate the dynamic agent system by:
+ 1. Creating a file organizer agent that can read and organize files
+ 2. Creating a content writer agent that can write documentation
+ 3. List your active agents
+ 4. Have the file organizer create a project structure
+ 5. Have the content writer create a README file
+ 6. Show the results of their work
+ 7. Clean up by terminating the agents
+ """)
+
+
+async def interactive_demo():
+ """Run an interactive demo where users can experiment with dynamic agents."""
+ async with fast.run() as agent:
+ print("\n=== Interactive Dynamic Agents Demo ===")
+ print("You can now interact with the project manager!")
+ print("Try commands like:")
+ print("- 'Create a mobile app development team'")
+ print("- 'Build a data analysis pipeline'")
+ print("- 'Set up a microservices architecture'")
+ print("- Type 'exit' to quit\n")
+
+ await agent.project_manager.interactive()
+
+
+if __name__ == "__main__":
+ import sys
+
+ if len(sys.argv) > 1 and sys.argv[1] == "simple":
+ asyncio.run(simple_example())
+ elif len(sys.argv) > 1 and sys.argv[1] == "interactive":
+ asyncio.run(interactive_demo())
+ else:
+ asyncio.run(main())
diff --git a/examples/dynamic-agents/fastagent.config.yaml b/examples/dynamic-agents/fastagent.config.yaml
new file mode 100644
index 00000000..80700100
--- /dev/null
+++ b/examples/dynamic-agents/fastagent.config.yaml
@@ -0,0 +1,23 @@
+# FastAgent configuration for Dynamic Agents example
+#
+# This configuration includes MCP servers that dynamic agents can use
+
+default_model: "gpt-4.1" # Fast model for demonstrations
+
+app:
+ log_dir: ".logs"
+
+mcp:
+ defaults:
+ auto_install_dependencies: true
+
+ servers:
+ # Filesystem server for file operations
+ filesystem:
+ command: "npx"
+ args: ["-y", "@modelcontextprotocol/server-filesystem", "."]
+
+ # Fetch server for web requests
+ fetch:
+ command: "uvx"
+ args: ["mcp-server-fetch"]
diff --git a/examples/dynamic-agents/interactive_demo.py b/examples/dynamic-agents/interactive_demo.py
new file mode 100644
index 00000000..fc64b98a
--- /dev/null
+++ b/examples/dynamic-agents/interactive_demo.py
@@ -0,0 +1,140 @@
+"""
+Interactive Dynamic Agents Demo
+
+This example provides an interactive interface where users can experiment
+with dynamic agents and see how they work in real-time.
+"""
+
+import asyncio
+
+from mcp_agent.core.fastagent import FastAgent
+
+# Create the application
+fast = FastAgent("Interactive Dynamic Agents Demo")
+
+
+@fast.agent(
+ name="interactive_manager",
+ instruction="""You are an interactive agent manager that helps users explore dynamic agents.
+
+You can create any type of specialist agent based on the user's needs:
+- Development teams (frontend, backend, DevOps, etc.)
+- Analysis teams (data scientists, researchers, etc.)
+- Creative teams (writers, designers, etc.)
+- Business teams (product managers, marketers, etc.)
+
+When a user asks for something:
+1. Analyze what type of specialists would be helpful
+2. Create appropriate dynamic agents
+3. Demonstrate how they work together
+4. Show the user the results
+
+Be conversational and educational - explain what you're doing and why.""",
+ servers=["filesystem", "fetch"],
+ dynamic_agents=True,
+ max_dynamic_agents=6,
+ model="haiku",
+)
+async def interactive_demo():
+ """Run an interactive demo where users can experiment with dynamic agents."""
+ async with fast.run() as agent:
+ print("=== Interactive Dynamic Agents Demo ===")
+ print()
+ print("🤖 Welcome to the Dynamic Agents playground!")
+ print()
+ print("You can ask me to create specialized teams for any task. Try:")
+ print(" • 'Create a web development team for an e-commerce site'")
+ print(" • 'Build a data analysis team to analyze sales data'")
+ print(" • 'Set up a content creation team for a marketing campaign'")
+ print(" • 'Create a code review team for a Python project'")
+ print(" • 'Build a research team to analyze market trends'")
+ print()
+ print("Type 'help' for more examples or 'exit' to quit")
+ print("=" * 60)
+ print()
+
+ await agent.interactive("interactive_manager")
+
+
+@fast.agent(
+ name="demo_guide",
+ instruction="""You are a helpful guide that demonstrates dynamic agents with pre-built examples.
+
+You have several demo scenarios ready to show:
+1. Software development team
+2. Content creation team
+3. Research and analysis team
+4. Marketing team
+5. Customer support team
+
+When asked, create the appropriate team and walk through a realistic scenario.""",
+ servers=["filesystem", "fetch"],
+ dynamic_agents=True,
+ max_dynamic_agents=5,
+)
+async def guided_demo():
+ """Run a guided demo with pre-built scenarios."""
+ async with fast.run() as agent:
+ print("=== Guided Dynamic Agents Demo ===")
+ print()
+ print("🎯 Choose a demo scenario:")
+ print(" 1. Software Development Team")
+ print(" 2. Content Creation Team")
+ print(" 3. Research & Analysis Team")
+ print(" 4. Marketing Team")
+ print(" 5. Customer Support Team")
+ print(" 6. All scenarios (sequential)")
+ print()
+
+ choice = input("Enter your choice (1-6): ").strip()
+
+ scenarios = {
+ "1": "Create a full-stack development team to build a social media platform",
+ "2": "Create a content team to produce a comprehensive product launch campaign",
+ "3": "Create a research team to analyze competitor strategies in the AI market",
+ "4": "Create a marketing team to launch a new mobile app",
+ "5": "Create a customer support team to handle technical inquiries",
+ "6": "all",
+ }
+
+ if choice == "6":
+ for i, scenario in enumerate(scenarios.values(), 1):
+ if scenario == "all":
+ continue
+ print(f"\n{'=' * 60}")
+ print(f"Demo {i}: {scenario}")
+ print("=" * 60)
+ await agent.demo_guide(scenario)
+ if i < 5: # Don't wait after the last demo
+ input("\nPress Enter to continue to the next demo...")
+ elif choice in scenarios:
+ await agent.demo_guide(scenarios[choice])
+ else:
+ print("Invalid choice. Running interactive demo instead...")
+ await interactive_demo()
+
+
+async def quick_demo():
+ """A quick demonstration of dynamic agents."""
+ async with fast.run() as agent:
+ print("=== Quick Dynamic Agents Demo ===")
+
+ await agent.demo_guide("""
+ Give me a quick demonstration of dynamic agents by:
+ 1. Creating 2-3 different specialist agents
+ 2. Showing how they can work together on a simple project
+ 3. Demonstrating their different capabilities
+
+ Keep it concise but informative.
+ """)
+
+
+if __name__ == "__main__":
+ import sys
+
+ if len(sys.argv) > 1 and sys.argv[1] == "guided":
+ asyncio.run(guided_demo())
+ elif len(sys.argv) > 1 and sys.argv[1] == "quick":
+ asyncio.run(quick_demo())
+ else:
+ asyncio.run(interactive_demo())
diff --git a/examples/dynamic-agents/project_manager.py b/examples/dynamic-agents/project_manager.py
new file mode 100644
index 00000000..7a4f37f8
--- /dev/null
+++ b/examples/dynamic-agents/project_manager.py
@@ -0,0 +1,113 @@
+"""
+Project Manager Dynamic Agents Example
+
+This example demonstrates a project manager that creates specialized development teams
+to handle complex software projects. The manager analyzes requirements and creates
+appropriate specialists on-the-fly.
+"""
+
+import asyncio
+
+from mcp_agent.core.fastagent import FastAgent
+
+# Create the application
+fast = FastAgent("Project Manager Demo")
+
+
+@fast.agent(
+ name="project_manager",
+ instruction="""You are a project manager that creates and manages specialized development teams.
+
+When given a project, you should:
+1. Analyze what specialists are needed
+2. Create appropriate dynamic agents with specific roles
+3. Delegate tasks to the specialists
+4. Coordinate their work to complete the project
+
+You have access to dynamic agent tools:
+- dynamic_agent_create: Create new specialized agents
+- dynamic_agent_send: Send tasks to specific agents
+- dynamic_agent_broadcast: Send tasks to multiple agents in parallel
+- dynamic_agent_list: See all your active agents
+- dynamic_agent_terminate: Clean up agents when done
+
+Available MCP servers for your specialists:
+- filesystem: For reading/writing files
+- fetch: For web requests and API calls
+
+Example specialist roles:
+- Frontend Developer (React/TypeScript expert)
+- Backend Developer (Python/FastAPI expert)
+- Database Designer (SQL/schema expert)
+- Security Reviewer (security best practices)
+- DevOps Engineer (deployment and infrastructure)
+- QA Tester (testing and quality assurance)
+""",
+ servers=["filesystem", "fetch"],
+ dynamic_agents=True,
+ max_dynamic_agents=5,
+ model="haiku",
+)
+async def main():
+ async with fast.run() as agent:
+ print("=== Project Manager Demo ===\n")
+
+ # Example 1: Web Development Project
+ print("Example 1: Building a Todo App")
+ await agent.project_manager("""
+ I need to build a React todo application with the following requirements:
+ 1. Frontend: React with TypeScript, modern hooks, responsive design
+ 2. Backend: Python FastAPI with RESTful endpoints
+ 3. Database: PostgreSQL schema design
+ 4. Security: Authentication, input validation, CORS
+ 5. Testing: Unit tests and integration tests
+
+ Please create appropriate specialists and coordinate their work to:
+ - Design the application architecture
+ - Create the database schema
+ - Build the backend API
+ - Develop the React frontend
+ - Implement security measures
+ - Write comprehensive tests
+
+ Show me the team you create and how you delegate the work.
+ """)
+
+ print("\n" + "=" * 50 + "\n")
+
+ # Example 2: Mobile App Project
+ print("Example 2: Mobile App Development")
+ await agent.project_manager("""
+ I need to create a mobile app for a fitness tracking platform:
+ 1. React Native app with offline capabilities
+ 2. Node.js backend with real-time features
+ 3. MongoDB for flexible data storage
+ 4. Integration with health APIs (Apple Health, Google Fit)
+ 5. Push notifications and analytics
+
+ Create a specialized team to handle this project and show how you coordinate
+ the development across mobile, backend, and integration specialists.
+ """)
+
+
+async def interactive_demo():
+ """Run an interactive demo where users can experiment with dynamic agents."""
+ async with fast.run() as agent:
+ print("\n=== Interactive Project Manager Demo ===")
+ print("You can now interact with the project manager!")
+ print("Try commands like:")
+ print("- 'Create a microservices architecture for an e-commerce platform'")
+ print("- 'Build a data analysis pipeline with Python and Apache Spark'")
+ print("- 'Set up a CI/CD pipeline for a React application'")
+ print("- Type 'exit' to quit\n")
+
+ await agent.interactive()
+
+
+if __name__ == "__main__":
+ import sys
+
+ if len(sys.argv) > 1 and sys.argv[1] == "interactive":
+ asyncio.run(interactive_demo())
+ else:
+ asyncio.run(main())
diff --git a/examples/dynamic-agents/simple_demo.py b/examples/dynamic-agents/simple_demo.py
new file mode 100644
index 00000000..a6f1eaec
--- /dev/null
+++ b/examples/dynamic-agents/simple_demo.py
@@ -0,0 +1,91 @@
+"""
+Simple Dynamic Agents Demo
+
+This example demonstrates the basic functionality of dynamic agents with
+simple use cases that are easy to understand and modify.
+"""
+
+import asyncio
+
+from mcp_agent.core.fastagent import FastAgent
+
+# Create the application
+fast = FastAgent("Simple Dynamic Agents Demo")
+
+
+@fast.agent(
+ name="simple_creator",
+ instruction="""You are a simple agent that demonstrates basic dynamic agent creation.
+
+You can create other agents and delegate simple tasks to them.
+Show how to create, use, and manage dynamic agents step by step.
+Be clear about what you're doing and explain each step.""",
+ servers=["filesystem"],
+ dynamic_agents=True,
+ max_dynamic_agents=3,
+ model="haiku",
+)
+async def simple_example():
+ async with fast.run() as agent:
+ print("=== Simple Dynamic Agent Example ===\n")
+
+ await agent.simple_creator("""
+ Please demonstrate the dynamic agent system by:
+ 1. Creating a file organizer agent that can read and organize files
+ 2. Creating a content writer agent that can write documentation
+ 3. List your active agents
+ 4. Have the file organizer create a project structure
+ 5. Have the content writer create a README file
+ 6. Show the results of their work
+ 7. Clean up by terminating the agents
+
+ Walk me through each step clearly.
+ """)
+
+
+@fast.agent(
+ name="task_delegator",
+ instruction="""You are a task delegation specialist. You break down complex tasks
+ into smaller pieces and create specialized agents to handle each piece.
+
+ Focus on clear task division and coordination between agents.""",
+ servers=["filesystem"],
+ dynamic_agents=True,
+ max_dynamic_agents=4,
+ model="haiku",
+)
+async def delegation_example():
+ async with fast.run() as agent:
+ print("\n=== Task Delegation Example ===\n")
+
+ await agent.task_delegator("""
+ I need to analyze a Python project and create documentation for it.
+
+ Please:
+ 1. Create a code analyzer agent to examine the project structure
+ 2. Create a documentation writer agent to write technical docs
+ 3. Create a readme generator agent to create user-friendly documentation
+ 4. Coordinate their work to produce comprehensive project documentation
+
+ Show how you delegate tasks and combine their results.
+ """)
+
+
+async def run_all_examples():
+ """Run all simple examples in sequence."""
+ print("Running Simple Dynamic Agents Examples...\n")
+
+ await simple_example()
+ # print("\n" + "="*60 + "\n")
+ # await delegation_example()
+
+
+if __name__ == "__main__":
+ import sys
+
+ if len(sys.argv) > 1 and sys.argv[1] == "delegation":
+ asyncio.run(delegation_example())
+ elif len(sys.argv) > 1 and sys.argv[1] == "basic":
+ asyncio.run(simple_example())
+ else:
+ asyncio.run(run_all_examples())
diff --git a/src/fast_agent/agents/agent_types.py b/src/fast_agent/agents/agent_types.py
index fe7c9ba8..cbeb0306 100644
--- a/src/fast_agent/agents/agent_types.py
+++ b/src/fast_agent/agents/agent_types.py
@@ -40,6 +40,8 @@ class AgentConfig:
use_history: bool = True
default_request_params: RequestParams | None = None
human_input: bool = False
+ dynamic_agents: bool = False
+ max_dynamic_agents: int = 5
agent_type: AgentType = AgentType.BASIC
default: bool = False
elicitation_handler: ElicitationFnT | None = None
diff --git a/src/mcp_agent/agents/dynamic_agent_manager.py b/src/mcp_agent/agents/dynamic_agent_manager.py
new file mode 100644
index 00000000..b1733586
--- /dev/null
+++ b/src/mcp_agent/agents/dynamic_agent_manager.py
@@ -0,0 +1,425 @@
+"""
+Dynamic Agent Manager for creating and managing agents at runtime.
+
+This manager handles the lifecycle of dynamic agents, following the same patterns
+as parallel agents for execution and communication.
+"""
+
+import asyncio
+import uuid
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Dict, List, Optional
+
+from pydantic import BaseModel, Field
+
+from a2a.types import AgentCard
+
+from mcp_agent.agents.agent import Agent
+from fast_agent.agents.agent_types import AgentConfig, AgentType
+from mcp_agent.core.direct_factory import get_model_factory
+from mcp_agent.core.prompt import Prompt
+from mcp_agent.logging.logger import get_logger
+
+if TYPE_CHECKING:
+ from mcp_agent.agents.base_agent import BaseAgent
+
+logger = get_logger(__name__)
+
+
+@dataclass
+class DynamicAgentSpec:
+ """Specification for creating a dynamic agent."""
+ name: str
+ instruction: str
+ servers: List[str]
+ tools: Optional[Dict[str, List[str]]] = None
+ model: Optional[str] = None
+
+
+def create_dynamic_agent_card(
+ agent_id: str,
+ name: str,
+ description: str,
+ servers: List[str],
+ status: str = "active",
+ context_tokens_used: int = 0,
+ last_activity: Optional[str] = None
+) -> AgentCard:
+ """Create an AgentCard for a dynamic agent."""
+ from a2a.types import AgentCapabilities, AgentSkill
+
+ # Create skills from servers
+ skills = []
+ for server in servers:
+ skills.append(AgentSkill(
+ id=f"mcp_{server}",
+ name=f"mcp_{server}",
+ description=f"Access to {server} MCP server",
+ tags=["mcp", "server", server]
+ ))
+
+ # Add status and metadata as additional skills
+ skills.append(AgentSkill(
+ id="agent_status",
+ name="agent_status",
+ description=f"Agent status: {status}",
+ tags=["status", "metadata"]
+ ))
+
+ if context_tokens_used > 0:
+ skills.append(AgentSkill(
+ id="usage_info",
+ name="usage_info",
+ description=f"Context tokens used: {context_tokens_used}",
+ tags=["usage", "metadata"]
+ ))
+
+ return AgentCard(
+ name=name,
+ description=description,
+ url=f"fast-agent://dynamic-agents/{agent_id}/",
+ version="0.1",
+ capabilities=AgentCapabilities(
+ supportsStreaming=False,
+ supportsFunctionCalling=True,
+ supportsToolUse=True
+ ),
+ defaultInputModes=["text/plain"],
+ defaultOutputModes=["text/plain"],
+ skills=skills,
+ provider=None,
+ documentationUrl=None
+ )
+
+
+class DynamicAgentManager:
+ """
+ Manages dynamic agents for a parent agent.
+
+ Follows the same patterns as ParallelAgent for execution and communication,
+ but allows creating agents at runtime based on task needs.
+ """
+
+ def __init__(self, parent_agent: "BaseAgent") -> None:
+ """
+ Initialize the dynamic agent manager.
+
+ Args:
+ parent_agent: The agent that owns this manager
+ """
+ self.parent_agent = parent_agent
+ self.dynamic_agents: Dict[str, Agent] = {}
+ self.max_agents = parent_agent.config.max_dynamic_agents
+ self.logger = get_logger(f"{__name__}.{parent_agent.name}")
+
+ async def create_agent(self, spec: DynamicAgentSpec) -> str:
+ """
+ Create a new dynamic agent.
+
+ Args:
+ spec: Specification for the agent to create
+
+ Returns:
+ agent_id: Unique identifier for the created agent
+
+ Raises:
+ ValueError: If max agents limit reached or invalid specification
+ """
+ # Check limits
+ if len(self.dynamic_agents) >= self.max_agents:
+ raise ValueError(f"Maximum number of dynamic agents ({self.max_agents}) reached")
+
+ # Validate servers exist in parent's context
+ available_servers = self.parent_agent.server_names
+ invalid_servers = set(spec.servers) - set(available_servers)
+ if invalid_servers:
+ raise ValueError(f"Invalid servers: {invalid_servers}. Available: {available_servers}")
+
+ # Generate unique agent ID
+ agent_id = f"{spec.name}_{uuid.uuid4().hex[:6]}"
+
+ # Create agent config
+ config = AgentConfig(
+ name=spec.name,
+ instruction=spec.instruction,
+ servers=spec.servers,
+ tools=spec.tools,
+ model=spec.model or self.parent_agent.config.model,
+ use_history=True, # Each dynamic agent has its own context
+ agent_type=AgentType.BASIC
+ )
+
+ # Create the agent using existing patterns
+ agent = Agent(
+ config=config,
+ context=self.parent_agent._context # Share context for MCP connections
+ )
+
+ # Initialize the agent
+ await agent.initialize()
+
+ # Attach LLM using the same process as factory
+ model_factory = get_model_factory(
+ context=self.parent_agent._context,
+ model=config.model,
+ default_model=self.parent_agent.config.model
+ )
+
+ await agent.attach_llm(
+ model_factory,
+ request_params=config.default_request_params,
+ api_key=config.api_key
+ )
+
+ # Store the agent
+ self.dynamic_agents[agent_id] = agent
+
+ self.logger.info(
+ f"Created dynamic agent '{spec.name}' with ID {agent_id}",
+ data={
+ "agent_id": agent_id,
+ "name": spec.name,
+ "servers": spec.servers,
+ "total_agents": len(self.dynamic_agents)
+ }
+ )
+
+ return agent_id
+
+ async def terminate_agent(self, agent_id: str) -> bool:
+ """
+ Terminate a dynamic agent and clean up resources.
+
+ Args:
+ agent_id: ID of the agent to terminate
+
+ Returns:
+ success: True if terminated successfully
+ """
+ if agent_id not in self.dynamic_agents:
+ return False
+
+ agent = self.dynamic_agents[agent_id]
+
+ try:
+ await agent.shutdown()
+ del self.dynamic_agents[agent_id]
+
+ self.logger.info(
+ f"Terminated dynamic agent {agent_id}",
+ data={
+ "agent_id": agent_id,
+ "remaining_agents": len(self.dynamic_agents)
+ }
+ )
+ return True
+
+ except Exception as e:
+ self.logger.error(f"Error terminating agent {agent_id}: {e}")
+ return False
+
+ async def send_to_agent(self, agent_id: str, message: str) -> str:
+ """
+ Send a message to a specific dynamic agent.
+
+ Args:
+ agent_id: ID of the agent to send to
+ message: Message to send
+
+ Returns:
+ response: The agent's response
+
+ Raises:
+ ValueError: If agent_id not found
+ """
+ if agent_id not in self.dynamic_agents:
+ raise ValueError(f"Agent {agent_id} not found")
+
+ agent = self.dynamic_agents[agent_id]
+ response = await agent.send(message)
+
+ self.logger.debug(
+ f"Sent message to agent {agent_id}",
+ data={"agent_id": agent_id, "message_length": len(message)}
+ )
+
+ return response
+
+ async def broadcast_message(
+ self,
+ message: str,
+ agent_ids: Optional[List[str]] = None,
+ parallel: bool = True
+ ) -> Dict[str, str]:
+ """
+ Send a message to multiple dynamic agents.
+
+ Uses the EXACT same parallel execution pattern as ParallelAgent.
+
+ Args:
+ message: Message to send to agents
+ agent_ids: Specific agents to send to (if None, sends to all)
+ parallel: Execute in parallel (True) or sequential (False)
+
+ Returns:
+ responses: Dict mapping agent_id to response
+ """
+ # Get agents to execute
+ if agent_ids:
+ # Validate all agent IDs exist
+ missing_ids = set(agent_ids) - set(self.dynamic_agents.keys())
+ if missing_ids:
+ raise ValueError(f"Agent IDs not found: {missing_ids}")
+ agents_to_execute = [(id, self.dynamic_agents[id]) for id in agent_ids]
+ else:
+ agents_to_execute = list(self.dynamic_agents.items())
+
+ if not agents_to_execute:
+ return {}
+
+ # Create prompt message
+ prompt_message = [Prompt.user(message)]
+
+ if parallel:
+ # Execute in parallel - SAME as ParallelAgent
+ responses = await asyncio.gather(
+ *[agent.generate(prompt_message) for _, agent in agents_to_execute],
+ return_exceptions=True
+ )
+ else:
+ # Execute sequentially
+ responses = []
+ for _, agent in agents_to_execute:
+ try:
+ response = await agent.generate(prompt_message)
+ responses.append(response)
+ except Exception as e:
+ responses.append(e)
+
+ # Process responses
+ result = {}
+ for i, (agent_id, agent) in enumerate(agents_to_execute):
+ response = responses[i]
+ if isinstance(response, Exception):
+ result[agent_id] = f"Error: {str(response)}"
+ else:
+ result[agent_id] = response.all_text()
+
+ self.logger.info(
+ f"Broadcast message to {len(agents_to_execute)} agents",
+ data={
+ "agent_count": len(agents_to_execute),
+ "parallel": parallel,
+ "message_length": len(message)
+ }
+ )
+
+ # Display results if console display is available
+ if len(result) > 1: # Only show tree view for multiple agents
+ self._show_agent_results(result, message)
+
+ return result
+
+ def _show_agent_results(self, responses: Dict[str, str], original_message: str = None) -> None:
+ """Show dynamic agent results using console display."""
+ try:
+ # Import here to avoid circular dependencies
+ from mcp_agent.ui.console_display import ConsoleDisplay
+
+ # Try to get display from parent agent
+ display = None
+ if hasattr(self.parent_agent, '_context') and self.parent_agent._context:
+ display = getattr(self.parent_agent._context, 'display', None)
+
+ # Create display if not available
+ if not display:
+ config = getattr(self.parent_agent, 'config', None)
+ display = ConsoleDisplay(config)
+
+ # Show results using the same pattern as parallel agents
+ display.show_dynamic_agent_results(responses, original_message)
+
+ except Exception as e:
+ # Silently fail if display not available
+ self.logger.debug(f"Could not display dynamic agent results: {e}")
+
+ def list_agents(self) -> List[AgentCard]:
+ """
+ List all active dynamic agents as AgentCard objects.
+
+ Returns:
+ agents: List of agent cards
+ """
+ result = []
+ for agent_id, agent in self.dynamic_agents.items():
+ # Get token usage if available
+ tokens_used = 0
+ if hasattr(agent, 'usage_accumulator') and agent.usage_accumulator:
+ summary = agent.usage_accumulator.get_summary()
+ tokens_used = summary.get('cumulative_input_tokens', 0) + summary.get('cumulative_output_tokens', 0)
+
+ card = create_dynamic_agent_card(
+ agent_id=agent_id,
+ name=agent.name,
+ description=agent.instruction,
+ servers=agent.config.servers,
+ status="active",
+ context_tokens_used=tokens_used
+ )
+ result.append(card)
+
+ return result
+
+ def get_agent(self, agent_id: str) -> Optional[Agent]:
+ """
+ Get a dynamic agent by ID.
+
+ Args:
+ agent_id: ID of the agent to retrieve
+
+ Returns:
+ agent: The agent instance or None if not found
+ """
+ return self.dynamic_agents.get(agent_id)
+
+ async def shutdown_all(self) -> None:
+ """
+ Shutdown all dynamic agents and clean up resources.
+ """
+ agent_ids = list(self.dynamic_agents.keys())
+ for agent_id in agent_ids:
+ await self.terminate_agent(agent_id)
+
+ self.logger.info("Shutdown all dynamic agents")
+
+ def format_responses_for_aggregation(
+ self,
+ responses: Dict[str, str],
+ original_message: Optional[str] = None
+ ) -> str:
+ """
+ Format dynamic agent responses for aggregation - SAME format as ParallelAgent.
+
+ Args:
+ responses: Dict mapping agent_id to response
+ original_message: The original message sent to agents
+
+ Returns:
+ formatted: Formatted string for aggregation
+ """
+ formatted = []
+
+ # Include the original message if provided
+ if original_message:
+ formatted.append("The following request was sent to the dynamic agents:")
+ formatted.append(f"\n{original_message}\n")
+
+ # Format each agent's response - SAME format as ParallelAgent
+ for agent_id, response in responses.items():
+ agent = self.dynamic_agents.get(agent_id)
+ agent_name = agent.name if agent else agent_id
+ formatted.append(
+ f'\n{response}\n'
+ )
+
+ return "\n\n".join(formatted)
\ No newline at end of file
diff --git a/src/mcp_agent/core/agent_app.py b/src/mcp_agent/core/agent_app.py
index 7520d626..3b4faab6 100644
--- a/src/mcp_agent/core/agent_app.py
+++ b/src/mcp_agent/core/agent_app.py
@@ -334,6 +334,9 @@ def _show_turn_usage(self, agent_name: str) -> None:
# Check if this is a parallel agent
if agent.agent_type == AgentType.PARALLEL:
self._show_parallel_agent_usage(agent)
+ # Check if this agent has dynamic agents
+ elif hasattr(agent, 'dynamic_agent_manager') and agent.dynamic_agent_manager and agent.dynamic_agent_manager.dynamic_agents:
+ self._show_dynamic_agent_usage(agent)
else:
self._show_regular_agent_usage(agent)
@@ -391,6 +394,55 @@ def _show_parallel_agent_usage(self, parallel_agent) -> None:
f"[dim] {prefix} {usage_data['name']}: {usage_data['display_text']}[/dim]{usage_data['cache_suffix']}"
)
+ def _show_dynamic_agent_usage(self, parent_agent) -> None:
+ """Show usage for dynamic agents created by a parent agent."""
+ if not hasattr(parent_agent, 'dynamic_agent_manager') or not parent_agent.dynamic_agent_manager:
+ return
+
+ # Collect usage from all dynamic agents
+ child_usage_data = []
+ total_input = 0
+ total_output = 0
+ total_tool_calls = 0
+
+ # Get usage from dynamic agents
+ for agent_id, agent in parent_agent.dynamic_agent_manager.dynamic_agents.items():
+ if agent:
+ usage_info = self._format_agent_usage(agent)
+ if usage_info:
+ # Extract agent name from agent_id (format: name_hexid)
+ agent_name = agent_id.rsplit('_', 1)[0] if '_' in agent_id else agent_id
+ child_usage_data.append({**usage_info, "name": agent_name})
+ total_input += usage_info["input_tokens"]
+ total_output += usage_info["output_tokens"]
+ total_tool_calls += usage_info["tool_calls"]
+
+ # Also show parent agent's own usage
+ parent_usage = self._format_agent_usage(parent_agent)
+ if parent_usage:
+ with progress_display.paused():
+ rich_print(
+ f"[dim]Last turn: {parent_usage['display_text']}[/dim]{parent_usage['cache_suffix']}"
+ )
+
+ if not child_usage_data:
+ return
+
+ # Show aggregated usage for dynamic agents
+ with progress_display.paused():
+ tool_info = f", {total_tool_calls} tool calls" if total_tool_calls > 0 else ""
+ rich_print(
+ f"[dim]Dynamic agents: {total_input:,} Input, {total_output:,} Output{tool_info}[/dim]"
+ )
+
+ # Show individual dynamic agent usage
+ for i, usage_data in enumerate(child_usage_data):
+ is_last = i == len(child_usage_data) - 1
+ prefix = "└─" if is_last else "├─"
+ rich_print(
+ f"[dim] {prefix} {usage_data['name']}: {usage_data['display_text']}[/dim]{usage_data['cache_suffix']}"
+ )
+
def _format_agent_usage(self, agent) -> Optional[Dict]:
"""Format usage information for a single agent."""
if not agent or not agent.usage_accumulator:
diff --git a/src/mcp_agent/core/direct_decorators.py b/src/mcp_agent/core/direct_decorators.py
index 67c82536..aabb55ab 100644
--- a/src/mcp_agent/core/direct_decorators.py
+++ b/src/mcp_agent/core/direct_decorators.py
@@ -234,6 +234,8 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
model=model,
use_history=use_history,
human_input=human_input,
+ dynamic_agents=extra_kwargs.get("dynamic_agents", False),
+ max_dynamic_agents=extra_kwargs.get("max_dynamic_agents", 5),
default=default,
elicitation_handler=extra_kwargs.get("elicitation_handler"),
api_key=extra_kwargs.get("api_key"),
@@ -282,6 +284,8 @@ def agent(
use_history: bool = True,
request_params: RequestParams | None = None,
human_input: bool = False,
+ dynamic_agents: bool = False,
+ max_dynamic_agents: int = 5,
default: bool = False,
elicitation_handler: Optional[ElicitationFnT] = None,
api_key: str | None = None,
@@ -301,6 +305,8 @@ def agent(
use_history: Whether to maintain conversation history
request_params: Additional request parameters for the LLM
human_input: Whether to enable human input capabilities
+ dynamic_agents: Whether to enable dynamic agent creation capabilities
+ max_dynamic_agents: Maximum number of dynamic agents this agent can create
default: Whether to mark this as the default agent
elicitation_handler: Custom elicitation handler function (ElicitationFnT)
api_key: Optional API key for the LLM provider
@@ -323,6 +329,8 @@ def agent(
use_history=use_history,
request_params=request_params,
human_input=human_input,
+ dynamic_agents=dynamic_agents,
+ max_dynamic_agents=max_dynamic_agents,
default=default,
elicitation_handler=elicitation_handler,
tools=tools,
diff --git a/tests/unit/fast_agent/agents/test_dynamic_agent_manager.py b/tests/unit/fast_agent/agents/test_dynamic_agent_manager.py
new file mode 100644
index 00000000..611676f6
--- /dev/null
+++ b/tests/unit/fast_agent/agents/test_dynamic_agent_manager.py
@@ -0,0 +1,387 @@
+"""
+Unit tests for DynamicAgentManager.
+
+These tests verify the core functionality of dynamic agent creation,
+lifecycle management, and communication.
+"""
+
+from unittest.mock import AsyncMock, Mock, patch
+
+import pytest
+
+from mcp_agent.agents.dynamic_agent_manager import (
+ DynamicAgentManager,
+ DynamicAgentSpec,
+ create_dynamic_agent_card,
+)
+from fast_agent.agents.agent_types import AgentConfig
+
+
+class TestDynamicAgentSpec:
+ """Test the DynamicAgentSpec dataclass."""
+
+ def test_spec_creation(self):
+ """Test creating a DynamicAgentSpec."""
+ spec = DynamicAgentSpec(
+ name="test_agent",
+ instruction="You are a test agent",
+ servers=["filesystem"]
+ )
+
+ assert spec.name == "test_agent"
+ assert spec.instruction == "You are a test agent"
+ assert spec.servers == ["filesystem"]
+ assert spec.tools is None
+ assert spec.model is None
+
+ def test_spec_with_optional_fields(self):
+ """Test creating a DynamicAgentSpec with optional fields."""
+ spec = DynamicAgentSpec(
+ name="test_agent",
+ instruction="You are a test agent",
+ servers=["filesystem", "fetch"],
+ tools={"filesystem": ["read*", "write*"]},
+ model="haiku"
+ )
+
+ assert spec.tools == {"filesystem": ["read*", "write*"]}
+ assert spec.model == "haiku"
+
+
+class TestCreateDynamicAgentCard:
+ """Test the create_dynamic_agent_card function."""
+
+ def test_card_creation(self):
+ """Test creating an AgentCard for a dynamic agent."""
+ card = create_dynamic_agent_card(
+ agent_id="test_123",
+ name="test_agent",
+ description="A test agent",
+ servers=["filesystem", "fetch"]
+ )
+
+ assert card.name == "test_agent"
+ assert card.description == "A test agent"
+ assert card.url == "fast-agent://dynamic-agents/test_123/"
+ assert card.version == "0.1"
+ assert len(card.skills) == 3 # 2 servers + 1 status skill
+ assert card.defaultInputModes == ["text/plain"]
+ assert card.defaultOutputModes == ["text/plain"]
+
+ def test_card_with_usage_info(self):
+ """Test creating an AgentCard with usage information."""
+ card = create_dynamic_agent_card(
+ agent_id="test_456",
+ name="usage_agent",
+ description="An agent with usage tracking",
+ servers=["filesystem"],
+ context_tokens_used=1500
+ )
+
+ assert card.name == "usage_agent"
+ assert len(card.skills) == 3 # 1 server + 1 status + 1 usage skill
+ # Check that usage skill is included
+ usage_skills = [skill for skill in card.skills if skill.name == "usage_info"]
+ assert len(usage_skills) == 1
+ assert "1500" in usage_skills[0].description
+
+
+class TestDynamicAgentManager:
+ """Test the DynamicAgentManager class."""
+
+ def setup_method(self):
+ """Set up test fixtures."""
+ # Create a mock parent agent
+ self.mock_parent = Mock()
+ self.mock_parent.config = AgentConfig(
+ name="parent_agent",
+ max_dynamic_agents=3,
+ model="haiku"
+ )
+ self.mock_parent.server_names = ["filesystem", "fetch"]
+ self.mock_parent._context = Mock()
+ self.mock_parent.name = "parent_agent"
+
+ # Create the manager
+ self.manager = DynamicAgentManager(self.mock_parent)
+
+ def test_initialization(self):
+ """Test manager initialization."""
+ assert self.manager.parent_agent == self.mock_parent
+ assert self.manager.max_agents == 3
+ assert len(self.manager.dynamic_agents) == 0
+
+ @pytest.mark.asyncio
+ async def test_create_agent_basic(self):
+ """Test basic agent creation."""
+ spec = DynamicAgentSpec(
+ name="test_agent",
+ instruction="You are a test agent",
+ servers=["filesystem"]
+ )
+
+ with patch('mcp_agent.agents.dynamic_agent_manager.Agent') as mock_agent_class, \
+ patch('mcp_agent.agents.dynamic_agent_manager.get_model_factory') as mock_factory:
+
+ # Setup mocks
+ mock_agent = AsyncMock()
+ mock_agent_class.return_value = mock_agent
+ mock_factory.return_value = Mock()
+
+ # Create agent
+ agent_id = await self.manager.create_agent(spec)
+
+ # Verify
+ assert agent_id.startswith("test_agent_")
+ assert len(agent_id) == len("test_agent_") + 6 # name + underscore + 6 hex chars
+ assert agent_id in self.manager.dynamic_agents
+
+ # Verify agent was created with correct config
+ mock_agent_class.assert_called_once()
+ config_arg = mock_agent_class.call_args[1]['config']
+ assert config_arg.name == "test_agent"
+ assert config_arg.instruction == "You are a test agent"
+ assert config_arg.servers == ["filesystem"]
+
+ # Verify agent was initialized and LLM attached
+ mock_agent.initialize.assert_called_once()
+ mock_agent.attach_llm.assert_called_once()
+
+ @pytest.mark.asyncio
+ async def test_create_agent_max_limit(self):
+ """Test agent creation fails when max limit reached."""
+ # Fill up to max capacity
+ for i in range(3):
+ agent_id = f"agent_{i}_abcdef"
+ mock_agent = Mock()
+ self.manager.dynamic_agents[agent_id] = mock_agent
+
+ # Try to create one more
+ spec = DynamicAgentSpec(
+ name="overflow_agent",
+ instruction="This should fail",
+ servers=["filesystem"]
+ )
+
+ with pytest.raises(ValueError, match="Maximum number of dynamic agents"):
+ await self.manager.create_agent(spec)
+
+ @pytest.mark.asyncio
+ async def test_create_agent_invalid_servers(self):
+ """Test agent creation fails with invalid servers."""
+ spec = DynamicAgentSpec(
+ name="test_agent",
+ instruction="You are a test agent",
+ servers=["invalid_server"]
+ )
+
+ with pytest.raises(ValueError, match="Invalid servers"):
+ await self.manager.create_agent(spec)
+
+ @pytest.mark.asyncio
+ async def test_terminate_agent(self):
+ """Test agent termination."""
+ # Add a mock agent
+ agent_id = "test_123"
+ mock_agent = AsyncMock()
+ self.manager.dynamic_agents[agent_id] = mock_agent
+
+ # Terminate it
+ result = await self.manager.terminate_agent(agent_id)
+
+ # Verify
+ assert result is True
+ assert agent_id not in self.manager.dynamic_agents
+ mock_agent.shutdown.assert_called_once()
+
+ @pytest.mark.asyncio
+ async def test_terminate_nonexistent_agent(self):
+ """Test terminating a non-existent agent."""
+ result = await self.manager.terminate_agent("nonexistent")
+ assert result is False
+
+ @pytest.mark.asyncio
+ async def test_send_to_agent(self):
+ """Test sending message to specific agent."""
+ # Add a mock agent
+ agent_id = "test_123"
+ mock_agent = AsyncMock()
+ mock_agent.send.return_value = "Agent response"
+ self.manager.dynamic_agents[agent_id] = mock_agent
+
+ # Send message
+ response = await self.manager.send_to_agent(agent_id, "Test message")
+
+ # Verify
+ assert response == "Agent response"
+ mock_agent.send.assert_called_once_with("Test message")
+
+ @pytest.mark.asyncio
+ async def test_send_to_nonexistent_agent(self):
+ """Test sending to non-existent agent fails."""
+ with pytest.raises(ValueError, match="Agent nonexistent not found"):
+ await self.manager.send_to_agent("nonexistent", "Test message")
+
+ @pytest.mark.asyncio
+ async def test_broadcast_message_parallel(self):
+ """Test broadcasting message to multiple agents in parallel."""
+ # Add mock agents
+ agents = {}
+ for i in range(3):
+ agent_id = f"agent_{i}"
+ mock_agent = AsyncMock()
+ mock_response = Mock()
+ mock_response.all_text.return_value = f"Response from agent {i}"
+ mock_agent.generate.return_value = mock_response
+ mock_agent.name = f"agent_{i}"
+ agents[agent_id] = mock_agent
+ self.manager.dynamic_agents[agent_id] = mock_agent
+
+ # Broadcast message
+ responses = await self.manager.broadcast_message("Test broadcast", parallel=True)
+
+ # Verify all agents received the message
+ assert len(responses) == 3
+ for i, (agent_id, response) in enumerate(responses.items()):
+ assert agent_id == f"agent_{i}"
+ assert response == f"Response from agent {i}"
+ agents[agent_id].generate.assert_called_once()
+
+ @pytest.mark.asyncio
+ async def test_broadcast_message_specific_agents(self):
+ """Test broadcasting to specific agents only."""
+ # Add mock agents
+ for i in range(3):
+ agent_id = f"agent_{i}"
+ mock_agent = AsyncMock()
+ mock_response = Mock()
+ mock_response.all_text.return_value = f"Response from agent {i}"
+ mock_agent.generate.return_value = mock_response
+ mock_agent.name = f"agent_{i}"
+ self.manager.dynamic_agents[agent_id] = mock_agent
+
+ # Broadcast to specific agents only
+ target_agents = ["agent_0", "agent_2"]
+ responses = await self.manager.broadcast_message(
+ "Test broadcast",
+ agent_ids=target_agents,
+ parallel=True
+ )
+
+ # Verify only targeted agents received the message
+ assert len(responses) == 2
+ assert "agent_0" in responses
+ assert "agent_2" in responses
+ assert "agent_1" not in responses
+
+ @pytest.mark.asyncio
+ async def test_broadcast_empty_agents(self):
+ """Test broadcasting with no agents."""
+ responses = await self.manager.broadcast_message("Test broadcast")
+ assert responses == {}
+
+ def test_list_agents(self):
+ """Test listing all agents."""
+ from a2a.types import AgentCard
+
+ # Add mock agents
+ for i in range(2):
+ agent_id = f"agent_{i}"
+ mock_agent = Mock()
+ mock_agent.name = f"agent_{i}"
+ mock_agent.instruction = f"Test agent {i}"
+ mock_agent.config = Mock()
+ mock_agent.config.servers = ["filesystem"]
+ mock_agent.usage_accumulator = None
+ self.manager.dynamic_agents[agent_id] = mock_agent
+
+ # List agents
+ agents = self.manager.list_agents()
+
+ # Verify
+ assert len(agents) == 2
+ for i, card in enumerate(agents):
+ assert isinstance(card, AgentCard)
+ assert card.name == f"agent_{i}"
+ assert card.description == f"Test agent {i}"
+ assert card.url == f"fast-agent://dynamic-agents/agent_{i}/"
+ assert len(card.skills) == 2 # 1 server + 1 status skill
+
+ def test_get_agent(self):
+ """Test getting agent by ID."""
+ # Add mock agent
+ agent_id = "test_123"
+ mock_agent = Mock()
+ self.manager.dynamic_agents[agent_id] = mock_agent
+
+ # Get agent
+ result = self.manager.get_agent(agent_id)
+ assert result == mock_agent
+
+ # Get non-existent agent
+ result = self.manager.get_agent("nonexistent")
+ assert result is None
+
+ @pytest.mark.asyncio
+ async def test_shutdown_all(self):
+ """Test shutting down all agents."""
+ # Add mock agents
+ mock_agents = {}
+ for i in range(3):
+ agent_id = f"agent_{i}"
+ mock_agent = AsyncMock()
+ mock_agents[agent_id] = mock_agent
+ self.manager.dynamic_agents[agent_id] = mock_agent
+
+ # Shutdown all
+ await self.manager.shutdown_all()
+
+ # Verify all agents were terminated
+ assert len(self.manager.dynamic_agents) == 0
+ for mock_agent in mock_agents.values():
+ mock_agent.shutdown.assert_called_once()
+
+ def test_format_responses_for_aggregation(self):
+ """Test formatting responses like ParallelAgent."""
+ # Add mock agents for names
+ mock_agent_1 = Mock()
+ mock_agent_1.name = "frontend_dev"
+ mock_agent_2 = Mock()
+ mock_agent_2.name = "backend_dev"
+ self.manager.dynamic_agents["agent_1"] = mock_agent_1
+ self.manager.dynamic_agents["agent_2"] = mock_agent_2
+
+ responses = {
+ "agent_1": "Frontend component created",
+ "agent_2": "API endpoints implemented"
+ }
+
+ formatted = self.manager.format_responses_for_aggregation(
+ responses, "Build the application"
+ )
+
+ # Verify format matches ParallelAgent
+ assert "The following request was sent to the dynamic agents:" in formatted
+ assert "" in formatted
+ assert "Build the application" in formatted
+ assert "" in formatted
+ assert '' in formatted
+ assert "Frontend component created" in formatted
+ assert '' in formatted
+ assert "API endpoints implemented" in formatted
+ assert "" in formatted
+
+ def test_format_responses_without_original_message(self):
+ """Test formatting responses without original message."""
+ mock_agent = Mock()
+ mock_agent.name = "test_agent"
+ self.manager.dynamic_agents["agent_1"] = mock_agent
+
+ responses = {"agent_1": "Task completed"}
+ formatted = self.manager.format_responses_for_aggregation(responses)
+
+ # Should not include original message section
+ assert "The following request was sent" not in formatted
+ assert "" not in formatted
+ assert '' in formatted
+ assert "Task completed" in formatted
\ No newline at end of file