Skip to content

Commit cb96b9b

Browse files
theinthein
authored andcommitted
Add multi-agent MCP system example with modular architecture
This commit introduces a comprehensive example demonstrating how to build scalable multi-agent systems using the Model Context Protocol (MCP). The architecture supports coordination between multiple MCP servers and specialized agents, designed to scale from a few agents to 100+ agents. Key components: 1. **MCP Client Manager** (`mcp_client_manager.py`) - Manages multiple MCP server connections with robust lifecycle management - Implements double-shielded cleanup (anyio + asyncio) to prevent cancel scope conflicts during shutdown - Supports concurrent cleanup with configurable timeouts - Uses MCPServerStreamableHttpParams for proper connection configuration 2. **Agent Factory** (`agent_factory.py`) - Factory pattern for creating and configuring specialized agents at scale - Supports dynamic agent registration via AgentConfig - Creates custom tool-agents using Runner.run_streamed() with advanced configuration - Enables agent orchestration with hierarchical coordination 3. **MCP Servers** (`servers/`) - Three example servers demonstrating different domains: - Math server: mathematical operations (add, multiply, power) - Text server: text manipulation (reverse, count words, uppercase) - Data server: data processing (filter, sort, aggregate) - Each server built with FastMCP using streamable-http transport 4. **Examples** - `multiple_mcp_servers_example.py`: Complete demonstration of orchestrator coordinating specialized agents - `adding_agents_example.py`: Shows how to dynamically add new agent types to the factory 5. **Documentation** - `README.md`: Quick start guide and common use cases - `ARCHITECTURE.md`: Detailed architecture guide for scaling patterns - `servers/README.md`: Server setup and usage instructions Technical highlights: - Streaming responses at all levels using ResponseTextDeltaEvent - Custom tool functions with @function_tool decorator wrapping Runner.run_streamed() - Proper error handling for connection failures and graceful shutdown - Logger suppression for harmless SDK cleanup messages - Reusable MCP connections across multiple agents for efficiency This architecture enables building complex multi-agent workflows with proper separation of concerns, robust error handling, and the ability to scale to enterprise-level agent systems.
1 parent 095496e commit cb96b9b

13 files changed

+1819
-0
lines changed

examples/mcp/ARCHITECTURE.md

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
# Multi-Agent MCP Architecture
2+
3+
This directory contains a scalable, modular architecture for building multi-agent systems with MCP servers.
4+
5+
## Directory Structure
6+
7+
```
8+
examples/mcp/
9+
├── __init__.py # Package exports
10+
├── mcp_client_manager.py # MCP connection management
11+
├── agent_factory.py # Agent creation and configuration
12+
├── multiple_mcp_servers_example.py # Main example
13+
├── ARCHITECTURE.md # This file
14+
├── servers/ # MCP server implementations
15+
│ ├── math_server.py
16+
│ ├── text_server.py
17+
│ ├── data_server.py
18+
│ └── README.md
19+
└── ...other examples...
20+
```
21+
22+
## Core Components
23+
24+
### 1. `mcp_client_manager.py`
25+
26+
**Purpose**: Manages lifecycle of MCP server connections
27+
28+
**Key Features**:
29+
- Connection pooling
30+
- Double-shielded cleanup (anyio + asyncio)
31+
- Concurrent shutdown with timeouts
32+
- Graceful error handling
33+
34+
**Usage**:
35+
```python
36+
from examples.mcp import StreamableHttpClientManager
37+
38+
manager = StreamableHttpClientManager()
39+
server = await manager.start_client("my-server", "http://localhost:8000/mcp")
40+
# ... use server ...
41+
await manager.stop_all()
42+
```
43+
44+
### 2. `agent_factory.py`
45+
46+
**Purpose**: Creates and configures specialized agents
47+
48+
**Key Features**:
49+
- Agent templates/configurations
50+
- Custom tool wrapping with streaming
51+
- Easy addition of new agent types
52+
- Orchestrator creation
53+
54+
**Usage**:
55+
```python
56+
from examples.mcp import AgentFactory
57+
58+
# Create factory with MCP servers
59+
factory = AgentFactory(
60+
math_server=math_server,
61+
text_server=text_server,
62+
)
63+
64+
# Create individual agents
65+
math_agent = factory.create_math_agent()
66+
text_agent = factory.create_text_agent()
67+
68+
# Create orchestrator
69+
orchestrator = factory.create_orchestrator()
70+
```
71+
72+
### 3. `multiple_mcp_servers_example.py`
73+
74+
**Purpose**: Demonstrates the complete architecture
75+
76+
**Shows**:
77+
- Multi-server connection setup
78+
- Agent creation via factory
79+
- Streaming orchestration
80+
- Proper cleanup
81+
82+
## Adding New Agent Types
83+
84+
### Step 1: Add Agent Configuration
85+
86+
```python
87+
# In agent_factory.py _setup_configs()
88+
"code": AgentConfig(
89+
name="Code Agent",
90+
instructions="You are a coding assistant...",
91+
max_turns=15,
92+
mcp_servers=[code_server] if code_server else [],
93+
),
94+
```
95+
96+
### Step 2: Add Convenience Method (Optional)
97+
98+
```python
99+
# In AgentFactory class
100+
def create_code_agent(self) -> Agent:
101+
"""Create a code assistant agent."""
102+
return self.create_agent("code")
103+
```
104+
105+
### Step 3: Add to Orchestrator
106+
107+
```python
108+
# When creating orchestrator
109+
code_agent = factory.create_code_agent()
110+
tool_agents = [
111+
(code_agent, "code_operations", "Perform code-related tasks..."),
112+
# ... other agents
113+
]
114+
orchestrator = factory.create_orchestrator(tool_agents=tool_agents)
115+
```
116+
117+
## Scaling to 100+ Agents
118+
119+
The architecture is designed for scale:
120+
121+
### Dynamic Agent Registration
122+
123+
```python
124+
# Add agents at runtime
125+
from examples.mcp import AgentConfig
126+
127+
for i in range(100):
128+
config = AgentConfig(
129+
name=f"Agent {i}",
130+
instructions=f"You are agent number {i}...",
131+
max_turns=10,
132+
mcp_servers=[servers[i % len(servers)]]
133+
)
134+
factory.add_agent_config(f"agent_{i}", config)
135+
```
136+
137+
### Agent Groups
138+
139+
```python
140+
# Group related agents
141+
data_agents = [
142+
factory.create_agent(f"data_agent_{i}")
143+
for i in range(10)
144+
]
145+
146+
analysis_agents = [
147+
factory.create_agent(f"analysis_agent_{i}")
148+
for i in range(10)
149+
]
150+
```
151+
152+
### Hierarchical Orchestration
153+
154+
```python
155+
# Create sub-orchestrators for different domains
156+
data_orchestrator = factory.create_orchestrator(
157+
tool_agents=[(agent, f"data_op_{i}", "...") for i, agent in enumerate(data_agents)]
158+
)
159+
160+
analysis_orchestrator = factory.create_orchestrator(
161+
tool_agents=[(agent, f"analysis_op_{i}", "...") for i, agent in enumerate(analysis_agents)]
162+
)
163+
164+
# Top-level orchestrator coordinates sub-orchestrators
165+
main_orchestrator = factory.create_orchestrator(
166+
tool_agents=[
167+
(data_orchestrator, "data_operations", "Handle data tasks"),
168+
(analysis_orchestrator, "analysis_operations", "Handle analysis tasks"),
169+
]
170+
)
171+
```
172+
173+
## Best Practices
174+
175+
### 1. Connection Management
176+
177+
- Always use `StreamableHttpClientManager` for multiple servers
178+
- Set appropriate timeouts for your use case
179+
- Use `stop_all()` in a finally block for cleanup
180+
181+
### 2. Agent Organization
182+
183+
- Group related agents by domain
184+
- Use descriptive names and instructions
185+
- Set appropriate `max_turns` per agent type
186+
187+
### 3. Error Handling
188+
189+
- Leverage the built-in shielded cleanup
190+
- Suppress cleanup errors (they're harmless)
191+
- Add custom error handling for business logic
192+
193+
### 4. Performance
194+
195+
- MCP servers can be shared across multiple agents
196+
- Connection pooling is automatic
197+
- Cleanup happens concurrently
198+
199+
## Common Patterns
200+
201+
### Pattern 1: Specialized Agent Pool
202+
203+
```python
204+
# Create a pool of specialized agents
205+
agents = {
206+
"math": factory.create_math_agent(),
207+
"text": factory.create_text_agent(),
208+
"data": factory.create_data_agent(),
209+
}
210+
211+
# Use based on task type
212+
task_type = determine_task_type(user_input)
213+
agent = agents[task_type]
214+
result = await Runner.run(agent, user_input)
215+
```
216+
217+
### Pattern 2: Dynamic Agent Selection
218+
219+
```python
220+
# Let orchestrator choose the right agent
221+
orchestrator = factory.create_orchestrator()
222+
result = await Runner.run(orchestrator, user_input)
223+
# Orchestrator automatically delegates to specialized agents
224+
```
225+
226+
### Pattern 3: Pipeline Processing
227+
228+
```python
229+
# Chain agents for multi-step processing
230+
async def process_pipeline(data):
231+
# Step 1: Data cleaning
232+
data_agent = factory.create_data_agent()
233+
cleaned = await Runner.run(data_agent, f"Clean this data: {data}")
234+
235+
# Step 2: Analysis
236+
analysis_agent = factory.create_agent("analysis")
237+
analyzed = await Runner.run(analysis_agent, f"Analyze: {cleaned.final_output}")
238+
239+
# Step 3: Reporting
240+
report_agent = factory.create_agent("report")
241+
report = await Runner.run(report_agent, f"Create report: {analyzed.final_output}")
242+
243+
return report.final_output
244+
```
245+
246+
## Troubleshooting
247+
248+
### Issue: Cancel Scope Errors
249+
250+
**Solution**: The architecture includes double-shielded cleanup and logger suppression. Make sure you're using the latest version of the managers.
251+
252+
### Issue: Connection Timeouts
253+
254+
**Solution**: Increase timeout values when creating clients:
255+
```python
256+
server = await manager.start_client(
257+
"my-server",
258+
url,
259+
timeout=30.0, # Increase from default 5.0
260+
sse_read_timeout=600.0 # Increase from default 300.0
261+
)
262+
```
263+
264+
### Issue: Too Many Open Connections
265+
266+
**Solution**: Reuse MCP server connections across agents:
267+
```python
268+
# Good: Share one server across many agents
269+
math_server = await manager.start_client("math", url)
270+
for i in range(100):
271+
agent = AgentConfig(..., mcp_servers=[math_server])
272+
273+
# Bad: Create separate connection per agent
274+
# (Don't do this)
275+
```
276+
277+
## Future Enhancements
278+
279+
Potential additions to the architecture:
280+
281+
1. **Agent Registry**: Central registry for discovering available agents
282+
2. **Load Balancing**: Distribute work across multiple MCP servers
283+
3. **Caching**: Cache agent responses for common queries
284+
4. **Monitoring**: Built-in metrics and tracing
285+
5. **Configuration Files**: YAML/JSON configs for agent definitions

0 commit comments

Comments
 (0)