-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathexample_streamable_http.py
More file actions
58 lines (48 loc) · 1.71 KB
/
example_streamable_http.py
File metadata and controls
58 lines (48 loc) · 1.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""Example usage of Minion Agent with Streamable HTTP MCP tool."""
import asyncio
import logging
import os
from dotenv import load_dotenv
from minion_agent import MinionAgent, AgentConfig, AgentFramework
from minion_agent.config import MCPStreamableHttp
from minion_agent.logging import setup_logger
from minion.agents import (
CodeAgent,
ToolCallingAgent,
)
# Load environment variables from .env file
load_dotenv()
# Configure the agent with streamable HTTP MCP tool
agent_config = AgentConfig(
model_id=os.environ.get("AZURE_DEPLOYMENT_NAME"),
name="streamable_http_assistant",
description="A helpful assistant with streamable HTTP capabilities",
model_args={
"azure_endpoint": os.environ.get("AZURE_OPENAI_ENDPOINT"),
"api_key": os.environ.get("AZURE_OPENAI_API_KEY"),
"api_version": os.environ.get("OPENAI_API_VERSION"),
"model": "gpt-4o", # Actual model to use in minion framework
},
tools=[
# Add streamable HTTP MCP tool pointing to localhost:3000/mcp
MCPStreamableHttp(
url="http://localhost:3000/mcp"
),
],
agent_type=CodeAgent,
)
async def main():
try:
# Create and run the agent
agent = await MinionAgent.create_async(AgentFramework.EXTERNAL_MINION_AGENT, agent_config)
# Test the streamable HTTP tool
result = await agent.run_async("Test the streamable HTTP tool and show me what capabilities it has")
print("Agent's response:", result.final_output.content)
print("done")
except Exception as e:
print(f"Error: {str(e)}")
import traceback
traceback.print_exc()
raise
if __name__ == "__main__":
asyncio.run(main())