diff --git a/examples/mcp/ARCHITECTURE.md b/examples/mcp/ARCHITECTURE.md new file mode 100644 index 000000000..14dfc3552 --- /dev/null +++ b/examples/mcp/ARCHITECTURE.md @@ -0,0 +1,285 @@ +# Multi-Agent MCP Architecture + +This directory contains a scalable, modular architecture for building multi-agent systems with MCP servers. + +## Directory Structure + +``` +examples/mcp/ +├── __init__.py # Package exports +├── mcp_client_manager.py # MCP connection management +├── agent_factory.py # Agent creation and configuration +├── multiple_mcp_servers_example.py # Main example +├── ARCHITECTURE.md # This file +├── servers/ # MCP server implementations +│ ├── math_server.py +│ ├── text_server.py +│ ├── data_server.py +│ └── README.md +└── ...other examples... +``` + +## Core Components + +### 1. `mcp_client_manager.py` + +**Purpose**: Manages lifecycle of MCP server connections + +**Key Features**: +- Connection pooling +- Double-shielded cleanup (anyio + asyncio) +- Concurrent shutdown with timeouts +- Graceful error handling + +**Usage**: +```python +from examples.mcp import StreamableHttpClientManager + +manager = StreamableHttpClientManager() +server = await manager.start_client("my-server", "http://localhost:8000/mcp") +# ... use server ... +await manager.stop_all() +``` + +### 2. `agent_factory.py` + +**Purpose**: Creates and configures specialized agents + +**Key Features**: +- Agent templates/configurations +- Custom tool wrapping with streaming +- Easy addition of new agent types +- Orchestrator creation + +**Usage**: +```python +from examples.mcp import AgentFactory + +# Create factory with MCP servers +factory = AgentFactory( + math_server=math_server, + text_server=text_server, +) + +# Create individual agents +math_agent = factory.create_math_agent() +text_agent = factory.create_text_agent() + +# Create orchestrator +orchestrator = factory.create_orchestrator() +``` + +### 3. `multiple_mcp_servers_example.py` + +**Purpose**: Demonstrates the complete architecture + +**Shows**: +- Multi-server connection setup +- Agent creation via factory +- Streaming orchestration +- Proper cleanup + +## Adding New Agent Types + +### Step 1: Add Agent Configuration + +```python +# In agent_factory.py _setup_configs() +"code": AgentConfig( + name="Code Agent", + instructions="You are a coding assistant...", + max_turns=15, + mcp_servers=[code_server] if code_server else [], +), +``` + +### Step 2: Add Convenience Method (Optional) + +```python +# In AgentFactory class +def create_code_agent(self) -> Agent: + """Create a code assistant agent.""" + return self.create_agent("code") +``` + +### Step 3: Add to Orchestrator + +```python +# When creating orchestrator +code_agent = factory.create_code_agent() +tool_agents = [ + (code_agent, "code_operations", "Perform code-related tasks..."), + # ... other agents +] +orchestrator = factory.create_orchestrator(tool_agents=tool_agents) +``` + +## Scaling to 100+ Agents + +The architecture is designed for scale: + +### Dynamic Agent Registration + +```python +# Add agents at runtime +from examples.mcp import AgentConfig + +for i in range(100): + config = AgentConfig( + name=f"Agent {i}", + instructions=f"You are agent number {i}...", + max_turns=10, + mcp_servers=[servers[i % len(servers)]] + ) + factory.add_agent_config(f"agent_{i}", config) +``` + +### Agent Groups + +```python +# Group related agents +data_agents = [ + factory.create_agent(f"data_agent_{i}") + for i in range(10) +] + +analysis_agents = [ + factory.create_agent(f"analysis_agent_{i}") + for i in range(10) +] +``` + +### Hierarchical Orchestration + +```python +# Create sub-orchestrators for different domains +data_orchestrator = factory.create_orchestrator( + tool_agents=[(agent, f"data_op_{i}", "...") for i, agent in enumerate(data_agents)] +) + +analysis_orchestrator = factory.create_orchestrator( + tool_agents=[(agent, f"analysis_op_{i}", "...") for i, agent in enumerate(analysis_agents)] +) + +# Top-level orchestrator coordinates sub-orchestrators +main_orchestrator = factory.create_orchestrator( + tool_agents=[ + (data_orchestrator, "data_operations", "Handle data tasks"), + (analysis_orchestrator, "analysis_operations", "Handle analysis tasks"), + ] +) +``` + +## Best Practices + +### 1. Connection Management + +- Always use `StreamableHttpClientManager` for multiple servers +- Set appropriate timeouts for your use case +- Use `stop_all()` in a finally block for cleanup + +### 2. Agent Organization + +- Group related agents by domain +- Use descriptive names and instructions +- Set appropriate `max_turns` per agent type + +### 3. Error Handling + +- Leverage the built-in shielded cleanup +- Suppress cleanup errors (they're harmless) +- Add custom error handling for business logic + +### 4. Performance + +- MCP servers can be shared across multiple agents +- Connection pooling is automatic +- Cleanup happens concurrently + +## Common Patterns + +### Pattern 1: Specialized Agent Pool + +```python +# Create a pool of specialized agents +agents = { + "math": factory.create_math_agent(), + "text": factory.create_text_agent(), + "data": factory.create_data_agent(), +} + +# Use based on task type +task_type = determine_task_type(user_input) +agent = agents[task_type] +result = await Runner.run(agent, user_input) +``` + +### Pattern 2: Dynamic Agent Selection + +```python +# Let orchestrator choose the right agent +orchestrator = factory.create_orchestrator() +result = await Runner.run(orchestrator, user_input) +# Orchestrator automatically delegates to specialized agents +``` + +### Pattern 3: Pipeline Processing + +```python +# Chain agents for multi-step processing +async def process_pipeline(data): + # Step 1: Data cleaning + data_agent = factory.create_data_agent() + cleaned = await Runner.run(data_agent, f"Clean this data: {data}") + + # Step 2: Analysis + analysis_agent = factory.create_agent("analysis") + analyzed = await Runner.run(analysis_agent, f"Analyze: {cleaned.final_output}") + + # Step 3: Reporting + report_agent = factory.create_agent("report") + report = await Runner.run(report_agent, f"Create report: {analyzed.final_output}") + + return report.final_output +``` + +## Troubleshooting + +### Issue: Cancel Scope Errors + +**Solution**: The architecture includes double-shielded cleanup and logger suppression. Make sure you're using the latest version of the managers. + +### Issue: Connection Timeouts + +**Solution**: Increase timeout values when creating clients: +```python +server = await manager.start_client( + "my-server", + url, + timeout=30.0, # Increase from default 5.0 + sse_read_timeout=600.0 # Increase from default 300.0 +) +``` + +### Issue: Too Many Open Connections + +**Solution**: Reuse MCP server connections across agents: +```python +# Good: Share one server across many agents +math_server = await manager.start_client("math", url) +for i in range(100): + agent = AgentConfig(..., mcp_servers=[math_server]) + +# Bad: Create separate connection per agent +# (Don't do this) +``` + +## Future Enhancements + +Potential additions to the architecture: + +1. **Agent Registry**: Central registry for discovering available agents +2. **Load Balancing**: Distribute work across multiple MCP servers +3. **Caching**: Cache agent responses for common queries +4. **Monitoring**: Built-in metrics and tracing +5. **Configuration Files**: YAML/JSON configs for agent definitions diff --git a/examples/mcp/README.md b/examples/mcp/README.md new file mode 100644 index 000000000..3a91e91ba --- /dev/null +++ b/examples/mcp/README.md @@ -0,0 +1,186 @@ +# Multi-Agent MCP System + +A scalable, modular architecture for building multi-agent systems with MCP (Model Context Protocol) servers. + +## Quick Start + +```bash +# 1. Start MCP servers (in separate terminals) +uv run python examples/mcp/servers/math_server.py +uv run python examples/mcp/servers/text_server.py +uv run python examples/mcp/servers/data_server.py + +# 2. Run the main example +uv run --env-file .env python examples/mcp/multiple_mcp_servers_example.py +``` + +## Architecture + +The system is built with three core components: + +### 1. **MCP Client Manager** (`mcp_client_manager.py`) +Manages MCP server connections with robust cleanup. + +```python +from examples.mcp import StreamableHttpClientManager + +manager = StreamableHttpClientManager() +server = await manager.start_client("math", "http://localhost:8001/mcp") +await manager.stop_all() # Automatic cleanup +``` + +### 2. **Agent Factory** (`agent_factory.py`) +Creates and configures specialized agents. + +```python +from examples.mcp import AgentFactory + +factory = AgentFactory( + math_server=math_server, + text_server=text_server +) + +# Create individual agents +math_agent = factory.create_math_agent() + +# Or create an orchestrator +orchestrator = factory.create_orchestrator() +``` + +### 3. **Examples** +- `multiple_mcp_servers_example.py` - Complete demo +- `adding_agents_example.py` - How to add new agents + +## Key Features + +✅ **Scalable** - Easily add 10-100+ agents +✅ **Modular** - Clean separation of concerns +✅ **Robust** - Double-shielded cleanup prevents errors +✅ **Streaming** - Real-time response streaming +✅ **Reusable** - Share MCP connections across agents + +## Adding New Agents + +```python +from examples.mcp import AgentConfig + +# Define new agent +config = AgentConfig( + name="Code Review Agent", + instructions="You are a code review specialist...", + max_turns=15, + mcp_servers=[server] +) + +# Add to factory +factory.add_agent_config("code_review", config) + +# Use it +agent = factory.create_agent("code_review") +``` + +## File Structure + +``` +examples/mcp/ +├── README.md # This file +├── ARCHITECTURE.md # Detailed architecture docs +├── mcp_client_manager.py # Connection management +├── agent_factory.py # Agent creation +├── multiple_mcp_servers_example.py # Main demo +├── adding_agents_example.py # Scaling example +└── servers/ # MCP server implementations + ├── math_server.py + ├── text_server.py + └── data_server.py +``` + +## Documentation + +- **[ARCHITECTURE.md](ARCHITECTURE.md)** - Complete architecture guide +- **[servers/README.md](servers/README.md)** - Server setup and usage + +## Common Use Cases + +### Use Case 1: Multi-Domain Assistant + +```python +# Create domain-specific agents +finance_agent = factory.create_agent("finance") +legal_agent = factory.create_agent("legal") +hr_agent = factory.create_agent("hr") + +# Let orchestrator route to the right agent +orchestrator = factory.create_orchestrator( + tool_agents=[ + (finance_agent, "finance_help", "Financial advice"), + (legal_agent, "legal_help", "Legal guidance"), + (hr_agent, "hr_help", "HR support"), + ] +) + +result = await Runner.run(orchestrator, user_query) +``` + +### Use Case 2: Processing Pipeline + +```python +# Chain specialized agents +data_cleaned = await Runner.run(data_agent, f"Clean: {raw_data}") +data_analyzed = await Runner.run(analysis_agent, f"Analyze: {data_cleaned.final_output}") +report = await Runner.run(report_agent, f"Report on: {data_analyzed.final_output}") +``` + +### Use Case 3: Agent Pool + +```python +# Create pool of similar agents for load distribution +agents = [factory.create_agent("analyzer") for _ in range(10)] + +# Round-robin or random selection +agent = agents[request_id % len(agents)] +result = await Runner.run(agent, task) +``` + +## Troubleshooting + +### "Error cleaning up server" messages + +These are harmless and suppressed by the architecture. Add this to your main: + +```python +import logging +logging.getLogger("agents.mcp.server").setLevel(logging.CRITICAL) +``` + +### Connection timeouts + +Increase timeout values: + +```python +server = await manager.start_client( + "my-server", url, + timeout=30.0, + sse_read_timeout=600.0 +) +``` + +### Too many agents + +Reuse MCP servers across agents - one server can serve many agents. + +## Next Steps + +1. Read [ARCHITECTURE.md](ARCHITECTURE.md) for scaling patterns +2. Try [adding_agents_example.py](adding_agents_example.py) to see how to add agents +3. Build your own custom agents using `AgentConfig` +4. Create hierarchical orchestrators for complex workflows + +## Contributing + +To add a new agent type: +1. Define an `AgentConfig` in `agent_factory.py` +2. Add a convenience method (optional) +3. Update this README with your use case + +Happy building! 🚀 diff --git a/examples/mcp/__init__.py b/examples/mcp/__init__.py new file mode 100644 index 000000000..4698cfa66 --- /dev/null +++ b/examples/mcp/__init__.py @@ -0,0 +1,20 @@ +""" +MCP Multi-Server Example Package + +This package provides a modular architecture for building multi-agent systems +with MCP (Model Context Protocol) servers. + +Components: +- mcp_client_manager: Manages MCP server connections with robust cleanup +- agent_factory: Creates and configures specialized agents +- multiple_mcp_servers_example: Main example demonstrating the architecture +""" + +from examples.mcp.agent_factory import AgentConfig, AgentFactory +from examples.mcp.mcp_client_manager import StreamableHttpClientManager + +__all__ = [ + "StreamableHttpClientManager", + "AgentFactory", + "AgentConfig", +] diff --git a/examples/mcp/adding_agents_example.py b/examples/mcp/adding_agents_example.py new file mode 100644 index 000000000..2ae3f9e4d --- /dev/null +++ b/examples/mcp/adding_agents_example.py @@ -0,0 +1,179 @@ +""" +Example: Adding New Agents to the Factory + +This example demonstrates how easy it is to add new agent types +to the AgentFactory for scaling to 10-100+ agents. +""" + +import asyncio +import sys +from pathlib import Path + +# Add project root to path for imports +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +from examples.mcp import AgentConfig, AgentFactory, StreamableHttpClientManager + + +async def main(): + """Demonstrate adding custom agents to the factory.""" + + # Set up MCP servers (assuming they're running) + manager = StreamableHttpClientManager() + + try: + # Connect to servers + math_server = await manager.start_client( + "math", "http://localhost:8001/mcp" + ) + text_server = await manager.start_client( + "text", "http://localhost:8002/mcp" + ) + + # Create factory + factory = AgentFactory(math_server=math_server, text_server=text_server) + + # ============================================================ + # Example 1: Add a single custom agent + # ============================================================ + print("Example 1: Adding a custom agent\n") + + # Define configuration + code_review_config = AgentConfig( + name="Code Review Agent", + instructions="""You are a code review specialist. + Analyze code for best practices, bugs, and improvements. + Provide constructive feedback.""", + max_turns=15, + mcp_servers=[text_server], # Uses text analysis tools + ) + + # Add to factory + factory.add_agent_config("code_review", code_review_config) + + # Create and use the agent + code_review_agent = factory.create_agent("code_review") + print(f"Created: {code_review_agent.name}") + print(f"Available types: {factory.list_agent_types()}\n") + + # ============================================================ + # Example 2: Add multiple agents programmatically + # ============================================================ + print("Example 2: Adding multiple agents programmatically\n") + + # Define agent types + agent_definitions = [ + { + "type": "calculator", + "name": "Calculator Agent", + "instructions": "Perform complex calculations using math tools", + "server": math_server, + "max_turns": 8, + }, + { + "type": "summarizer", + "name": "Summarizer Agent", + "instructions": "Summarize text concisely while preserving key points", + "server": text_server, + "max_turns": 10, + }, + { + "type": "translator", + "name": "Translator Agent", + "instructions": "Translate text between languages", + "server": text_server, + "max_turns": 5, + }, + ] + + # Add all agents to factory + for defn in agent_definitions: + config = AgentConfig( + name=defn["name"], + instructions=defn["instructions"], + max_turns=defn["max_turns"], + mcp_servers=[defn["server"]], + ) + factory.add_agent_config(defn["type"], config) + + print(f"Total agent types: {len(factory.list_agent_types())}") + print(f"Available: {factory.list_agent_types()}\n") + + # ============================================================ + # Example 3: Create orchestrator with custom agents + # ============================================================ + print("Example 3: Custom orchestrator with new agents\n") + + # Create custom agent instances + calculator = factory.create_agent("calculator") + summarizer = factory.create_agent("summarizer") + translator = factory.create_agent("translator") + + # Define tool-agent mappings + custom_tool_agents = [ + (calculator, "calculate", "Perform mathematical calculations"), + (summarizer, "summarize", "Summarize text content"), + (translator, "translate", "Translate text between languages"), + ] + + # Create orchestrator + orchestrator = factory.create_orchestrator( + tool_agents=custom_tool_agents, + custom_instructions="""You are a multi-purpose assistant. + You can calculate, summarize, and translate. + Choose the right tool for each task.""", + ) + + print(f"Created orchestrator with {len(orchestrator.tools)} tools") + print(f"Tools: {[t.__name__ for t in orchestrator.tools]}\n") + + # ============================================================ + # Example 4: Scaling to many agents + # ============================================================ + print("Example 4: Scaling to many specialized agents\n") + + # Create 10 domain-specific agents + domains = [ + "finance", + "healthcare", + "education", + "legal", + "marketing", + "hr", + "sales", + "engineering", + "design", + "research", + ] + + for domain in domains: + config = AgentConfig( + name=f"{domain.capitalize()} Agent", + instructions=f"You are a {domain} specialist. Use available tools to help with {domain}-related tasks.", + max_turns=12, + mcp_servers=[ + math_server if domain in ["finance", "engineering"] else text_server + ], + ) + factory.add_agent_config(domain, config) + + print(f"Total agent types now: {len(factory.list_agent_types())}") + print(f"Specialized domains: {domains}") + print("\nYou can now create any agent with:") + print(" agent = factory.create_agent('domain_name')") + + finally: + await manager.stop_all() + + +if __name__ == "__main__": + import logging + + # Suppress MCP cleanup errors + logging.getLogger("agents.mcp.server").setLevel(logging.CRITICAL) + + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nInterrupted") diff --git a/examples/mcp/agent_factory.py b/examples/mcp/agent_factory.py new file mode 100644 index 000000000..a79154e67 --- /dev/null +++ b/examples/mcp/agent_factory.py @@ -0,0 +1,294 @@ +""" +Agent Factory for creating and configuring specialized agents with MCP tools. + +This module provides a scalable way to define and create agents that work with +MCP servers. It supports: +- Agent templates/configs for reusability +- Custom tool wrapping with streaming support +- Easy addition of new agent types + +Usage: + factory = AgentFactory(mcp_servers) + math_agent = factory.create_math_agent() + orchestrator = factory.create_orchestrator() +""" + +from typing import Any, Callable + +from openai.types.responses import ResponseTextDeltaEvent + +from agents import Agent, Runner, function_tool +from agents.mcp import MCPServerStreamableHttp + + +class AgentConfig: + """Configuration for an agent type.""" + + def __init__( + self, + name: str, + instructions: str, + max_turns: int = 10, + mcp_servers: list[MCPServerStreamableHttp] | None = None, + tools: list[Any] | None = None, + ): + self.name = name + self.instructions = instructions + self.max_turns = max_turns + self.mcp_servers = mcp_servers or [] + self.tools = tools or [] + + +class AgentFactory: + """ + Factory for creating specialized agents with MCP tool integration. + + This factory encapsulates agent creation logic and makes it easy to: + - Define agent templates + - Create agents with MCP server tools + - Wrap agents as custom tools with streaming + - Scale to many agent types + """ + + def __init__( + self, + math_server: MCPServerStreamableHttp | None = None, + text_server: MCPServerStreamableHttp | None = None, + data_server: MCPServerStreamableHttp | None = None, + ): + """ + Initialize the factory with MCP servers. + + Args: + math_server: MCP server for math operations + text_server: MCP server for text operations + data_server: MCP server for data operations + """ + self.math_server = math_server + self.text_server = text_server + self.data_server = data_server + + # Agent configurations + self._configs = self._setup_configs() + + def _setup_configs(self) -> dict[str, AgentConfig]: + """Define all agent configurations.""" + return { + "math": AgentConfig( + name="Math Agent", + instructions="""You are a math specialist. Use the available math tools to perform calculations. + Always show your work step by step.""", + max_turns=10, + mcp_servers=[self.math_server] if self.math_server else [], + ), + "text": AgentConfig( + name="Text Agent", + instructions="""You are a text processing specialist. Use the available text tools to manipulate text. + Explain what transformations you're applying.""", + max_turns=8, + mcp_servers=[self.text_server] if self.text_server else [], + ), + "data": AgentConfig( + name="Data Agent", + instructions="""You are a data processing specialist. Use the available data tools to process lists and arrays. + Explain each step of data transformation.""", + max_turns=8, + mcp_servers=[self.data_server] if self.data_server else [], + ), + } + + def create_agent(self, agent_type: str) -> Agent: + """ + Create an agent by type. + + Args: + agent_type: Type of agent to create ("math", "text", "data", etc.) + + Returns: + Configured Agent instance + + Raises: + ValueError: If agent_type is unknown + """ + if agent_type not in self._configs: + raise ValueError( + f"Unknown agent type: {agent_type}. Available: {list(self._configs.keys())}" + ) + + config = self._configs[agent_type] + return Agent( + name=config.name, + instructions=config.instructions, + mcp_servers=config.mcp_servers, + tools=config.tools, + ) + + def create_math_agent(self) -> Agent: + """Create a math specialist agent.""" + return self.create_agent("math") + + def create_text_agent(self) -> Agent: + """Create a text processing agent.""" + return self.create_agent("text") + + def create_data_agent(self) -> Agent: + """Create a data processing agent.""" + return self.create_agent("data") + + def create_custom_tool( + self, + agent: Agent, + tool_name: str, + tool_description: str, + max_turns: int = 10, + show_streaming: bool = True, + ) -> Callable: + """ + Wrap an agent as a custom tool with streaming support. + + Args: + agent: The agent to wrap + tool_name: Name for the tool + tool_description: Description of what the tool does + max_turns: Maximum turns for agent execution + show_streaming: Whether to print streaming output + + Returns: + A function_tool decorated async function + """ + + @function_tool + async def custom_tool(task: str) -> str: + f""" + {tool_description} + + Args: + task: Description of the task to perform + + Returns: + Result of the operation + """ + if show_streaming: + print(f" [{agent.name}] Processing: {task}") + + result_stream = Runner.run_streamed( + agent, + input=task, + max_turns=max_turns, + ) + + # Stream and collect output + final_output = "" + if show_streaming: + print(f" [{agent.name}] Result: ", end="", flush=True) + + async for event in result_stream.stream_events(): + if event.type == "raw_response_event" and isinstance( + event.data, ResponseTextDeltaEvent + ): + if show_streaming: + print(event.data.delta, end="", flush=True) + final_output += event.data.delta + + if show_streaming: + print() # New line after streaming + + return final_output + + # Set the function name for better debugging + custom_tool.__name__ = tool_name + return custom_tool + + def create_orchestrator( + self, + tool_agents: list[tuple[Agent, str, str]] | None = None, + custom_instructions: str | None = None, + ) -> Agent: + """ + Create an orchestrator agent that coordinates multiple specialized agents. + + Args: + tool_agents: List of (agent, tool_name, description) tuples. + If None, uses default math/text/data agents. + custom_instructions: Custom instructions for the orchestrator. + If None, uses default instructions. + + Returns: + Orchestrator Agent with custom tool-agents + """ + # Use default agents if none provided + if tool_agents is None: + math_agent = self.create_math_agent() + text_agent = self.create_text_agent() + data_agent = self.create_data_agent() + + tool_agents = [ + ( + math_agent, + "math_operations", + "Perform mathematical operations like addition, multiplication, and exponentiation.", + ), + ( + text_agent, + "text_operations", + "Perform text operations like reversing, counting words, or converting case.", + ), + ( + data_agent, + "data_operations", + "Perform data operations like filtering, sorting, or aggregating lists.", + ), + ] + + # Create custom tools from agents + tools = [ + self.create_custom_tool( + agent, + tool_name, + description, + max_turns=self._configs.get( + tool_name.replace("_operations", ""), AgentConfig("", "", 10) + ).max_turns, + ) + for agent, tool_name, description in tool_agents + ] + + # Default orchestrator instructions + if custom_instructions is None: + custom_instructions = """You are a data processing orchestrator. + You coordinate between specialized agents: + - math_operations: for calculations (add, multiply, power) + - text_operations: for text manipulation (reverse, count words, uppercase) + - data_operations: for data processing (filter, sort, aggregate) + + For complex tasks, break them down and delegate to the appropriate specialist agent. + Combine results when needed to answer the user's question.""" + + return Agent( + name="Data Processing Orchestrator", + instructions=custom_instructions, + tools=tools, + ) + + def add_agent_config(self, agent_type: str, config: AgentConfig): + """ + Add a new agent configuration to the factory. + + Args: + agent_type: Unique identifier for this agent type + config: AgentConfig instance defining the agent + + Example: + config = AgentConfig( + name="Code Agent", + instructions="You are a coding assistant...", + max_turns=15, + mcp_servers=[code_server] + ) + factory.add_agent_config("code", config) + """ + self._configs[agent_type] = config + + def list_agent_types(self) -> list[str]: + """List all available agent types.""" + return list(self._configs.keys()) diff --git a/examples/mcp/mcp_client_manager.py b/examples/mcp/mcp_client_manager.py new file mode 100644 index 000000000..94f04ab4c --- /dev/null +++ b/examples/mcp/mcp_client_manager.py @@ -0,0 +1,188 @@ +""" +MCP Client Manager for managing multiple Streamable HTTP MCP server connections. + +This module provides a robust client manager that handles connection lifecycle +with proper cleanup using double-shielded (anyio + asyncio) teardown to avoid +cancel scope conflicts during shutdown. +""" + +import asyncio +from contextlib import suppress + +import anyio + +from agents.mcp import MCPServerStreamableHttp, MCPServerStreamableHttpParams + + +class StreamableHttpClientManager: + """ + Manages multiple Streamable HTTP MCP server connections without context managers. + + Features: + - Connection pooling and lifecycle management + - Double-shielded cleanup (anyio + asyncio) for safe teardown + - Concurrent cleanup with timeouts + - Graceful error handling during shutdown + + Usage: + manager = StreamableHttpClientManager() + server = await manager.start_client("my-server", "http://localhost:8000/mcp") + # ... use server ... + await manager.stop_all() + """ + + def __init__(self): + self._servers: dict[str, MCPServerStreamableHttp] = {} + + async def start_client( + self, + name: str, + url: str, + *, + headers: dict[str, str] | None = None, + timeout: float | None = 5.0, + sse_read_timeout: float | None = 300.0, + terminate_on_close: bool | None = True, + cache_tools_list: bool = True, + **server_kwargs, + ) -> MCPServerStreamableHttp: + """ + Start and connect to an MCP server. + + Args: + name: Unique identifier for this server connection + url: MCP server URL (e.g., "http://localhost:8000/mcp") + headers: Optional HTTP headers for authentication + timeout: Connection timeout in seconds + sse_read_timeout: Server-Sent Events read timeout in seconds + terminate_on_close: Send termination signal on close + cache_tools_list: Cache the tools list after first fetch + **server_kwargs: Additional arguments for MCPServerStreamableHttp + + Returns: + Connected MCPServerStreamableHttp instance + + Raises: + ValueError: If a client with this name already exists + """ + if name in self._servers: + raise ValueError(f"Client '{name}' already exists") + + # Build connection parameters using MCPServerStreamableHttpParams + params = MCPServerStreamableHttpParams( + url=url, + headers=headers, + timeout=timeout, + sse_read_timeout=sse_read_timeout, + terminate_on_close=terminate_on_close, + ) + + # Create and connect server + server = MCPServerStreamableHttp( + params=params, + cache_tools_list=cache_tools_list, + **server_kwargs, + ) + + try: + await server.connect() + self._servers[name] = server + return server + except (asyncio.CancelledError, Exception) as e: + # Failed to connect - cleanup and re-raise + try: + await server.cleanup() + except Exception: + pass # Ignore cleanup errors + raise ConnectionError(f"Failed to connect to MCP server '{name}' at {url}: {e}") from e + + def get(self, name: str) -> MCPServerStreamableHttp: + """Get a server connection by name.""" + return self._servers[name] + + def list_servers(self) -> list[str]: + """List all active server connection names.""" + return list(self._servers.keys()) + + async def _shielded_cleanup( + self, server: MCPServerStreamableHttp, *, timeout: float | None + ): + """ + Run server.cleanup() with double shielding to prevent cancel scope conflicts. + + Uses both anyio.CancelScope(shield=True) and asyncio.shield() to ensure + cleanup completes even during event loop teardown. + + Args: + server: The MCP server to clean up + timeout: Optional timeout in seconds (None = no timeout) + """ + + async def _inner(): + # AnyIO shield prevents cancel scopes from interrupting cleanup() + with anyio.CancelScope(shield=True): + await server.cleanup() + + if timeout is None: + # Don't allow outer-task cancellation to kill cleanup + with suppress(asyncio.CancelledError): + await asyncio.shield(_inner()) + else: + # Timebox *outside* the shields + with suppress(asyncio.TimeoutError, asyncio.CancelledError): + await asyncio.wait_for(asyncio.shield(_inner()), timeout=timeout) + + async def stop_client( + self, name: str, *, timeout: float = 5.0, suppress_errors: bool = True + ): + """ + Stop a specific MCP client connection with shielded cleanup. + + Args: + name: Name of the client to stop + timeout: Cleanup timeout in seconds + suppress_errors: Whether to suppress cleanup errors (default: True) + """ + server = self._servers.pop(name, None) + if not server: + return + + try: + await self._shielded_cleanup(server, timeout=timeout) + except Exception: + if not suppress_errors: + raise + # Silently ignore cleanup errors during shutdown + + async def stop_all(self, *, timeout_per_client: float = 5.0): + """ + Close all connections concurrently with shielded cleanup. + + Args: + timeout_per_client: Timeout for each client cleanup in seconds + """ + items = list(self._servers.items()) + self._servers.clear() + + if not items: + return + + # Clean up all servers concurrently + tasks = [ + asyncio.create_task( + self._shielded_cleanup(server, timeout=timeout_per_client) + ) + for _, server in items + ] + + # Best-effort: don't explode on cancellation/errors during shutdown + with suppress(asyncio.CancelledError): + await asyncio.gather(*tasks, return_exceptions=True) + + def __len__(self) -> int: + """Return the number of active connections.""" + return len(self._servers) + + def __contains__(self, name: str) -> bool: + """Check if a server connection exists.""" + return name in self._servers diff --git a/examples/mcp/multiple_mcp_servers_example.py b/examples/mcp/multiple_mcp_servers_example.py new file mode 100644 index 000000000..d939d87a8 --- /dev/null +++ b/examples/mcp/multiple_mcp_servers_example.py @@ -0,0 +1,193 @@ +""" +Example demonstrating multiple localhost MCP servers with an orchestrator agent. + +This example shows: +- Multiple MCP servers, each with multiple tools +- Modular architecture with separate manager and factory components +- Custom tool-agents using Runner.run_streamed for advanced configuration +- Orchestrator agent that coordinates specialized agents +- Streaming responses at all levels + +Use case: Data processing pipeline with math and text operations + +Prerequisites: +1. Start math_server: uv run python examples/mcp/servers/math_server.py +2. Start text_server: uv run python examples/mcp/servers/text_server.py +3. Start data_server: uv run python examples/mcp/servers/data_server.py +""" + +import asyncio +from contextlib import suppress + +from openai.types.responses import ResponseTextDeltaEvent + +from agents import Runner + +# Add project root to path for imports +import sys +from pathlib import Path +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +from examples.mcp.agent_factory import AgentFactory +from examples.mcp.mcp_client_manager import StreamableHttpClientManager + + +async def main(): + """ + Main example showing how to use multiple MCP servers with custom tool-agents. + + Assumes MCP servers are already running on ports 8001, 8002, 8003. + """ + + # Initialize client manager + client_manager = StreamableHttpClientManager() + + try: + # Connect to multiple MCP servers + print("Connecting to MCP servers...") + + math_server = await client_manager.start_client( + name="math_server", + url="http://localhost:8001/mcp", + timeout=10.0, + ) + + text_server = await client_manager.start_client( + name="text_server", + url="http://localhost:8002/mcp", + timeout=10.0, + ) + + data_server = await client_manager.start_client( + name="data_server", + url="http://localhost:8003/mcp", + timeout=10.0, + ) + + print("✓ Connected to all MCP servers\n") + + # Get tools from each server for display + math_tools = await math_server.list_tools() + text_tools = await text_server.list_tools() + data_tools = await data_server.list_tools() + + print(f"Math server tools: {[t.name for t in math_tools]}") + print(f"Text server tools: {[t.name for t in text_tools]}") + print(f"Data server tools: {[t.name for t in data_tools]}\n") + + # Create agent factory with MCP servers + factory = AgentFactory( + math_server=math_server, + text_server=text_server, + data_server=data_server, + ) + + # Create orchestrator with default agents + orchestrator = factory.create_orchestrator() + + # Example 1: Math operations + print("=" * 60) + print("Example 1: Math Operations") + print("=" * 60) + stream1 = Runner.run_streamed( + orchestrator, + "Calculate (5 + 3) * 2 and then raise it to the power of 2", + max_turns=15, + ) + final_result1 = "" + print("Orchestrator: ", end="", flush=True) + async for event in stream1.stream_events(): + if event.type == "raw_response_event" and isinstance( + event.data, ResponseTextDeltaEvent + ): + print(event.data.delta, end="", flush=True) + final_result1 += event.data.delta + print(f"\n\nFinal Result: {final_result1}\n") + + # # Example 2: Text operations + # print("=" * 60) + # print("Example 2: Text Operations") + # print("=" * 60) + # stream2 = Runner.run_streamed( + # orchestrator, + # "Take the text 'hello world' and reverse it, then count the words, then convert to uppercase", + # max_turns=15, + # ) + # final_result2 = "" + # print("Orchestrator: ", end="", flush=True) + # async for event in stream2.stream_events(): + # if event.type == "raw_response_event" and isinstance( + # event.data, ResponseTextDeltaEvent + # ): + # print(event.data.delta, end="", flush=True) + # final_result2 += event.data.delta + # print(f"\n\nFinal Result: {final_result2}\n") + + # # Example 3: Data operations + # print("=" * 60) + # print("Example 3: Data Operations") + # print("=" * 60) + # stream3 = Runner.run_streamed( + # orchestrator, + # "I have a list of numbers: [5, 2, 8, 1, 9, 3]. Filter those greater than 3, then sort them, then calculate their sum", + # max_turns=15, + # ) + # final_result3 = "" + # print("Orchestrator: ", end="", flush=True) + # async for event in stream3.stream_events(): + # if event.type == "raw_response_event" and isinstance( + # event.data, ResponseTextDeltaEvent + # ): + # print(event.data.delta, end="", flush=True) + # final_result3 += event.data.delta + # print(f"\n\nFinal Result: {final_result3}\n") + + # # Example 4: Combined operations + # print("=" * 60) + # print("Example 4: Combined Operations") + # print("=" * 60) + # stream4 = Runner.run_streamed( + # orchestrator, + # "Reverse the text 'Data Science', count its words, then multiply that count by 10", + # max_turns=15, + # ) + # final_result4 = "" + # print("Orchestrator: ", end="", flush=True) + # async for event in stream4.stream_events(): + # if event.type == "raw_response_event" and isinstance( + # event.data, ResponseTextDeltaEvent + # ): + # print(event.data.delta, end="", flush=True) + # final_result4 += event.data.delta + # print(f"\n\nFinal Result: {final_result4}\n") + + except KeyboardInterrupt: + print("\n\nInterrupted by user") + except Exception as e: + print(f"\n\nError: {e}") + import traceback + traceback.print_exc() + finally: + # Clean up all connections with shielded cleanup + print("\n\nShutting down...") + # Ensure cleanup completes even if the main task is being cancelled + with suppress(asyncio.CancelledError): + await asyncio.shield(client_manager.stop_all()) + print("✓ All client connections closed") + + +if __name__ == "__main__": + import logging + + # Suppress the "Error cleaning up server" messages from the MCP SDK + # These are logged but harmless - they occur during normal shutdown + # due to anyio cancel scope task migration + logging.getLogger("agents.mcp.server").setLevel(logging.CRITICAL) + + # Use asyncio.run with shielded cleanup to avoid cancel scope errors + # The shielded cleanup ensures proper MCP client teardown + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nInterrupted") diff --git a/examples/mcp/servers/README.md b/examples/mcp/servers/README.md new file mode 100644 index 000000000..77e6e59ce --- /dev/null +++ b/examples/mcp/servers/README.md @@ -0,0 +1,127 @@ +# Multiple MCP Servers Example + +This directory contains MCP server implementations that work with the modular multi-agent architecture. + +**📖 For architecture details, see [../ARCHITECTURE.md](../ARCHITECTURE.md)** + +## Overview + +The example includes: + +1. **Three MCP Servers** (each with multiple tools): + - `math_server.py` - Mathematical operations (add, multiply, power) + - `text_server.py` - Text manipulation (reverse_text, count_words, to_uppercase) + - `data_server.py` - Data processing (filter_list, sort_list, aggregate) + +2. **Client Application** (`multiple_mcp_servers_example.py`): + - Uses `StreamableHttpClientManager` to manage multiple server connections + - Creates specialized agents for each domain (math, text, data) + - Uses custom tool functions with `Runner.run_streamed()` for advanced configuration + - Creates an orchestrator agent that coordinates between specialized agents + - Demonstrates streaming responses and complex multi-step operations + +## Setup + +1. Create a `.env` file in the project root with your OpenAI API key: + ```bash + OPENAI_API_KEY=your_api_key_here + ``` + +2. Install dependencies: + ```bash + uv sync + ``` + +## Running the Example + +### Quick Start + +Run the provided shell script that starts all servers and the client: + +```bash +./examples/mcp/servers/run_example.sh +``` + +Or manually start servers in separate terminals: + +If you prefer to run servers manually, open three separate terminals: + +**Terminal 1 - Math Server (port 8001):** +```bash +cd examples/mcp/servers +uv run uvicorn math_server:mcp.app --port 8001 +``` + +**Terminal 2 - Text Server (port 8002):** +```bash +cd examples/mcp/servers +uv run uvicorn text_server:mcp.app --port 8002 +``` + +**Terminal 3 - Data Server (port 8003):** +```bash +cd examples/mcp/servers +uv run uvicorn data_server:mcp.app --port 8003 +``` + +**Terminal 4 - Run Client:** +```bash +uv run --env-file .env examples/mcp/multiple_mcp_servers_example.py +``` + +## What Happens + +The client will: +1. Connect to all three MCP servers +2. Collect all available tools from each server +3. Create specialized agents (math, text, data) with their respective tools +4. Create custom tool functions using `Runner.run_streamed()` with advanced configuration (max_turns, etc.) +5. Create an orchestrator agent that uses these custom tool-agents +6. Run several example queries with streaming output +7. Demonstrate complex operations that chain multiple agents together + +## Example Queries + +The demo includes four examples: + +1. **Math Operations**: `(5 + 3) * 2 ^ 2` +2. **Text Operations**: Reverse, count words, uppercase +3. **Data Operations**: Filter, sort, aggregate numbers +4. **Combined**: Mix text and math operations + +## Architecture + +``` +┌────────────────────────────────────────────────────────┐ +│ Orchestrator Agent │ +│ (coordinates specialized agents) │ +└────────────────────────────────────────────────────────┘ + │ + ┌───────────────────┼───────────────────┐ + │ │ │ + ▼ ▼ ▼ +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│Math Agent │ │Text Agent │ │Data Agent │ +│(custom tool)│ │(custom tool)│ │(custom tool)│ +└─────────────┘ └─────────────┘ └─────────────┘ + │ │ │ + │ StreamableHttpClientManager │ + │ │ │ + ┌───▼────┐ ┌───▼────┐ ┌───▼────┐ + │ Math │ │ Text │ │ Data │ + │ Server │ │ Server │ │ Server │ + │ :8001 │ │ :8002 │ │ :8003 │ + └────────┘ └────────┘ └────────┘ +``` + +## Key Concepts + +- **StreamableHttpClientManager**: Manages multiple MCP server connections without context managers +- **Specialized Agents**: Each domain (math, text, data) has its own agent with specific tools +- **Custom Tool Functions**: Using `@function_tool` with `Runner.run_streamed()` for advanced configuration: + - Set `max_turns` per agent + - Configure `run_config` options + - Handle streaming responses + - Add custom logging/monitoring +- **Agent Orchestration**: Top-level agent coordinates between specialized agents +- **Streaming**: Real-time response streaming at both agent and orchestrator levels diff --git a/examples/mcp/servers/data_server.py b/examples/mcp/servers/data_server.py new file mode 100644 index 000000000..434ffd162 --- /dev/null +++ b/examples/mcp/servers/data_server.py @@ -0,0 +1,91 @@ +""" +Data MCP Server - provides data processing operations. + +Tools: +- filter_list: Filter a list based on a condition +- sort_list: Sort a list +- aggregate: Aggregate a list (sum, average, min, max) + +Run with: uv run examples/mcp/servers/data_server.py +""" + +import os +from typing import Literal + +from dotenv import load_dotenv +from mcp.server.fastmcp import FastMCP + +load_dotenv() + +# Create FastMCP server +mcp = FastMCP("Data Operations Server",port=8003) + + +@mcp.tool() +def filter_list(numbers: list[float], threshold: float, comparison: str = "greater") -> list[float]: + """Filter a list of numbers based on a threshold. + + Args: + numbers: List of numbers to filter + threshold: Threshold value + comparison: Comparison type ('greater', 'less', 'equal') + + Returns: + Filtered list + """ + if comparison == "greater": + return [n for n in numbers if n > threshold] + elif comparison == "less": + return [n for n in numbers if n < threshold] + elif comparison == "equal": + return [n for n in numbers if n == threshold] + else: + return numbers + + +@mcp.tool() +def sort_list(numbers: list[float], reverse: bool = False) -> list[float]: + """Sort a list of numbers. + + Args: + numbers: List of numbers to sort + reverse: Sort in descending order if True + + Returns: + Sorted list + """ + return sorted(numbers, reverse=reverse) + + +@mcp.tool() +def aggregate( + numbers: list[float], operation: Literal["sum", "average", "min", "max"] = "sum" +) -> float: + """Aggregate a list of numbers using various operations. + + Args: + numbers: List of numbers to aggregate + operation: Type of aggregation ('sum', 'average', 'min', 'max') + + Returns: + Aggregated result + """ + if not numbers: + return 0.0 + + if operation == "sum": + return sum(numbers) + elif operation == "average": + return sum(numbers) / len(numbers) + elif operation == "min": + return min(numbers) + elif operation == "max": + return max(numbers) + else: + return sum(numbers) + + +if __name__ == "__main__": + # Run the server + # To specify a port, use: uvicorn data_server:mcp.app --port 8003 + mcp.run(transport="streamable-http") diff --git a/examples/mcp/servers/math_server.py b/examples/mcp/servers/math_server.py new file mode 100644 index 000000000..cf1b5d778 --- /dev/null +++ b/examples/mcp/servers/math_server.py @@ -0,0 +1,68 @@ +""" +Math MCP Server - provides mathematical operations. + +Tools: +- add: Add two numbers +- multiply: Multiply two numbers +- power: Raise a number to a power + +Run with: uv run examples/mcp/servers/math_server.py +""" + +import os + +from dotenv import load_dotenv +from mcp.server.fastmcp import FastMCP + +load_dotenv() + +# Create FastMCP server +mcp = FastMCP("Math Operations Server",port=8001) + + +@mcp.tool() +def add(a: float, b: float) -> float: + """Add two numbers together. + + Args: + a: First number + b: Second number + + Returns: + Sum of a and b + """ + return a + b + + +@mcp.tool() +def multiply(a: float, b: float) -> float: + """Multiply two numbers. + + Args: + a: First number + b: Second number + + Returns: + Product of a and b + """ + return a * b + + +@mcp.tool() +def power(base: float, exponent: float) -> float: + """Raise a number to a power. + + Args: + base: Base number + exponent: Exponent to raise to + + Returns: + Result of base^exponent + """ + return base**exponent + + +if __name__ == "__main__": + # Run the server + # To specify a port, use: uvicorn math_server:mcp.app --port 8001 + mcp.run(transport="streamable-http") diff --git a/examples/mcp/servers/text_server.py b/examples/mcp/servers/text_server.py new file mode 100644 index 000000000..2f43acf3b --- /dev/null +++ b/examples/mcp/servers/text_server.py @@ -0,0 +1,65 @@ +""" +Text MCP Server - provides text manipulation operations. + +Tools: +- reverse_text: Reverse a string +- count_words: Count words in a string +- to_uppercase: Convert text to uppercase + +Run with: uv run examples/mcp/servers/text_server.py +""" + +import os + +from dotenv import load_dotenv +from mcp.server.fastmcp import FastMCP + +load_dotenv() + +# Create FastMCP server +mcp = FastMCP("Text Operations Server",port=8002) + + +@mcp.tool() +def reverse_text(text: str) -> str: + """Reverse a string. + + Args: + text: Text to reverse + + Returns: + Reversed text + """ + return text[::-1] + + +@mcp.tool() +def count_words(text: str) -> int: + """Count the number of words in a text. + + Args: + text: Text to count words in + + Returns: + Number of words + """ + return len(text.split()) + + +@mcp.tool() +def to_uppercase(text: str) -> str: + """Convert text to uppercase. + + Args: + text: Text to convert + + Returns: + Text in uppercase + """ + return text.upper() + + +if __name__ == "__main__": + # Run the server + # To specify a port, use: uvicorn text_server:mcp.app --port 8002 + mcp.run(transport="streamable-http")