Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 41 additions & 15 deletions src/praisonai-agents/praisonaiagents/mcp/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class MCP:
```
"""

def __init__(self, command_or_string=None, args=None, *, command=None, timeout=60, debug=False, **kwargs):
def __init__(self, command_or_string=None, args=None, *, command=None, timeout=60, debug=False, transport="auto", **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring the constructor to reduce complexity.

The constructor has become quite complex with multiple responsibilities. Consider extracting transport detection and client initialization into separate methods:

def __init__(self, command_or_string=None, args=None, *, command=None, timeout=60, debug=False, transport="auto", **kwargs):
    # Handle backward compatibility and basic setup
    if command_or_string is None and command is not None:
        command_or_string = command
    
    self.timeout = timeout
    self.debug = debug
    self._setup_logging(debug)
    
    # Initialize based on input type
    if self._is_http_url(command_or_string):
        self._initialize_http_client(command_or_string, transport, debug, timeout)
    else:
        self._initialize_stdio_client(command_or_string, args, timeout, **kwargs)

def _is_http_url(self, command_or_string):
    return isinstance(command_or_string, str) and re.match(r'^https?://', command_or_string)

def _initialize_http_client(self, url, transport, debug, timeout):
    # HTTP client initialization logic
    pass

def _initialize_stdio_client(self, command_or_string, args, timeout, **kwargs):
    # Stdio client initialization logic  
    pass
🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 143-143: Too many arguments (7/5)

(R0913)


[refactor] 143-143: Too many local variables (16/15)

(R0914)


[refactor] 143-143: Too many branches (22/12)

(R0912)


[refactor] 143-143: Too many statements (83/50)

(R0915)

🤖 Prompt for AI Agents
In src/praisonai-agents/praisonaiagents/mcp/mcp.py at line 143, the constructor
is too complex with multiple responsibilities. Refactor by extracting transport
detection and client initialization into separate methods: create a method
_is_http_url to check if the input is an HTTP URL, then split the initialization
logic into _initialize_http_client and _initialize_stdio_client methods. Update
the constructor to handle backward compatibility, set basic attributes, and call
these new methods accordingly to simplify and clarify the constructor's flow.

"""
Initialize the MCP connection and get tools.

Expand All @@ -150,10 +150,13 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6
- A complete command string (e.g., "/path/to/python /path/to/app.py")
- For NPX: 'npx' command with args for smithery tools
- An SSE URL (e.g., "http://localhost:8080/sse")
- An HTTP URL (e.g., "http://localhost:8080/stream")
args: Arguments to pass to the command (when command_or_string is the command)
command: Alternative parameter name for backward compatibility
timeout: Timeout in seconds for MCP server initialization and tool calls (default: 60)
debug: Enable debug logging for MCP operations (default: False)
transport: Transport type - "auto", "sse", "http-streaming", or "stdio"
"auto" will detect based on URL format (default: "auto")
**kwargs: Additional parameters for StdioServerParameters
"""
# Handle backward compatibility with named parameter 'command'
Expand Down Expand Up @@ -187,15 +190,36 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6
self.timeout = timeout
self.debug = debug

# Check if this is an SSE URL
# Check if this is an HTTP URL
if isinstance(command_or_string, str) and re.match(r'^https?://', command_or_string):
# Import the SSE client implementation
from .mcp_sse import SSEMCPClient
self.sse_client = SSEMCPClient(command_or_string, debug=debug, timeout=timeout)
self._tools = list(self.sse_client.tools)
self.is_sse = True
self.is_npx = False
return
# Determine transport type
if transport == "auto":
# Default to SSE for /sse endpoints, HTTP-streaming otherwise
if command_or_string.endswith('/sse'):
transport = "sse"
else:
transport = "http-streaming"

if transport == "sse":
# Import the SSE client implementation
from .mcp_sse import SSEMCPClient
self.http_client = SSEMCPClient(command_or_string, debug=debug, timeout=timeout)
self._tools = list(self.http_client.tools)
self.is_http = True
self.is_sse = True # Keep for backward compatibility
self.is_npx = False
return
elif transport == "http-streaming":
# Import the HTTP-Streaming client implementation
from .mcp_http_streaming import HTTPStreamingMCPClient
self.http_client = HTTPStreamingMCPClient(command_or_string, debug=debug, timeout=timeout)
self._tools = list(self.http_client.tools)
self.is_http = True
self.is_sse = False
self.is_npx = False
return
else:
raise ValueError(f"Unknown transport type: {transport}")

# Handle the single string format for stdio client
if isinstance(command_or_string, str) and args is None:
Expand Down Expand Up @@ -273,8 +297,8 @@ def _generate_tool_functions(self) -> List[Callable]:
Returns:
List[Callable]: Functions that can be used as tools
"""
if self.is_sse:
return list(self.sse_client.tools)
if self.is_http:
return list(self.http_client.tools)

tool_functions = []

Expand Down Expand Up @@ -445,9 +469,9 @@ def to_openai_tool(self):
Returns:
dict or list: OpenAI-compatible tool definition(s)
"""
if self.is_sse and hasattr(self, 'sse_client') and self.sse_client.tools:
# Return all tools from SSE client
return self.sse_client.to_openai_tools()
if self.is_http and hasattr(self, 'http_client') and self.http_client.tools:
# Return all tools from HTTP client (SSE or HTTP-Streaming)
return self.http_client.to_openai_tools()

# For simplicity, we'll convert the first tool only if multiple exist
# More complex implementations could handle multiple tools
Expand Down Expand Up @@ -485,4 +509,6 @@ def to_openai_tool(self):
def __del__(self):
"""Clean up resources when the object is garbage collected."""
if hasattr(self, 'runner'):
self.runner.shutdown()
self.runner.shutdown()
if hasattr(self, 'http_client') and hasattr(self.http_client, 'shutdown'):
self.http_client.shutdown()
217 changes: 217 additions & 0 deletions src/praisonai-agents/praisonaiagents/mcp/mcp_http_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
"""
HTTP-Streaming client implementation for MCP (Model Context Protocol).
Provides HTTP chunked streaming transport as an alternative to SSE.
"""

import asyncio
import logging
import threading
import queue
import json
from typing import Any, Dict, List, Optional
from mcp import ClientSession
from mcp.client.session import Transport
from mcp.shared.memory import get_session_from_context

logger = logging.getLogger(__name__)


class HTTPStreamingTransport(Transport):
"""HTTP chunked streaming transport for MCP."""

def __init__(self, url: str, headers: Optional[Dict[str, str]] = None):
self.url = url
self.headers = headers or {}
self._closed = False

async def start(self) -> None:
"""Initialize the transport."""
# TODO: Implement actual HTTP streaming connection
# For now, this is a placeholder that follows the Transport interface
pass

async def close(self) -> None:
"""Close the transport."""
self._closed = True

async def send(self, message: Dict[str, Any]) -> None:
"""Send a message through the transport."""
if self._closed:
raise RuntimeError("Transport is closed")
# TODO: Implement actual HTTP streaming send
# This would send the message as a chunked HTTP request

async def receive(self) -> Dict[str, Any]:
"""Receive a message from the transport."""
if self._closed:
raise RuntimeError("Transport is closed")
# TODO: Implement actual HTTP streaming receive
# This would read from the chunked HTTP response stream
raise NotImplementedError("HTTP streaming receive not yet implemented")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The HTTPStreamingTransport class's receive method raises a NotImplementedError, and the send and start methods are empty. This will cause the HTTPStreamingMCPClient to fail during initialization. Implement the logic for start, send, and receive to handle HTTP chunked streaming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Complete the HTTP streaming transport implementation.

The core transport methods (start, send, receive) are not implemented, making the HTTP streaming feature non-functional. This contradicts the PR objective of "introduces HTTP-Streaming support."

Would you like me to help implement the actual HTTP chunked streaming logic using aiohttp or httpx libraries?

🤖 Prompt for AI Agents
In src/praisonai-agents/praisonaiagents/mcp/mcp_http_streaming.py between lines
27 and 50, the async methods start, send, and receive are placeholders without
actual HTTP streaming logic, making the transport non-functional. Implement
these methods using an async HTTP client library like aiohttp or httpx to
establish a chunked HTTP connection in start, send messages as chunked requests
in send, and read chunked responses in receive. Also, ensure proper handling of
connection lifecycle and errors to fulfill the HTTP streaming transport
functionality.



class HTTPStreamingMCPTool:
"""Wrapper for MCP tools accessed via HTTP streaming."""

def __init__(self, tool_def: Dict[str, Any], call_func):
self.name = tool_def["name"]
self.description = tool_def.get("description", "")
self.inputSchema = tool_def.get("inputSchema", {})
self._call_func = call_func

def __call__(self, **kwargs):
"""Synchronous wrapper for calling the tool."""
result_queue = queue.Queue()

async def _async_call():
try:
result = await self._call_func(self.name, kwargs)
result_queue.put(("success", result))
except Exception as e:
result_queue.put(("error", e))

# Run in event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
loop.run_until_complete(_async_call())
finally:
loop.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In the __call__ method of HTTPStreamingMCPTool, a new event loop is created for every tool call. This is inefficient. The HTTPStreamingMCPClient already runs a dedicated event loop in a background thread. The tool calls should leverage that existing loop by using asyncio.run_coroutine_threadsafe with the client's loop.


status, result = result_queue.get()
if status == "error":
raise result
return result

async def _async_call(self, **kwargs):
"""Async version of tool call."""
return await self._call_func(self.name, kwargs)

def to_openai_tool(self):
"""Convert to OpenAI tool format."""
schema = self.inputSchema.copy()
self._fix_array_schemas(schema)

return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": schema
}
}

def _fix_array_schemas(self, schema):
"""Fix array schemas for OpenAI compatibility."""
if isinstance(schema, dict):
if schema.get("type") == "array" and "items" not in schema:
schema["items"] = {"type": "string"}
for value in schema.values():
if isinstance(value, dict):
self._fix_array_schemas(value)


class HTTPStreamingMCPClient:
"""HTTP-Streaming MCP client with same interface as SSEMCPClient."""

def __init__(self, server_url: str, debug: bool = False, timeout: int = 60):
self.server_url = server_url
self.debug = debug
self.timeout = timeout
self.tools = []
self._client = None
self._session = None
self._transport = None
self._thread = None
self._loop = None

# Initialize in background thread
self._initialize()

def _initialize(self):
"""Initialize the HTTP streaming connection in a background thread."""
init_done = threading.Event()

def _thread_init():
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)

async def _async_init():
try:
# Create transport
self._transport = HTTPStreamingTransport(self.server_url)

# Create MCP client
self._client = ClientSession()

# Initialize session with transport
await self._client.initialize(self._transport)

# Store session in context
self._session = self._client

# List available tools
tools_result = await self._client.call_tool("list-tools", {})
if tools_result and hasattr(tools_result, 'tools'):
for tool_def in tools_result.tools:
tool = HTTPStreamingMCPTool(
tool_def.model_dump(),
self._call_tool_async
)
self.tools.append(tool)

if self.debug:
logger.info(f"HTTP Streaming MCP client initialized with {len(self.tools)} tools")

except Exception as e:
logger.error(f"Failed to initialize HTTP Streaming MCP client: {e}")
raise

self._loop.run_until_complete(_async_init())
init_done.set()

# Keep the loop running
self._loop.run_forever()

self._thread = threading.Thread(target=_thread_init, daemon=True)
self._thread.start()

# Wait for initialization
init_done.wait(timeout=self.timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In the _initialize method, the init_done.set() call is inside the _thread_init function but not within a finally block. If an exception occurs during _async_init, the init_done event will never be set, causing the main thread to hang. init_done.set() should be called in a finally block.


async def _call_tool_async(self, tool_name: str, arguments: Dict[str, Any]):
"""Call a tool asynchronously."""
if not self._session:
raise RuntimeError("HTTP Streaming MCP client not initialized")

result = await self._session.call_tool(tool_name, arguments)

# Extract content from result
if hasattr(result, 'content'):
content = result.content
if len(content) == 1 and hasattr(content[0], 'text'):
return content[0].text
return [c.text if hasattr(c, 'text') else str(c) for c in content]
return result

def __iter__(self):
"""Make client iterable to return tools."""
return iter(self.tools)

def to_openai_tools(self):
"""Convert all tools to OpenAI format."""
return [tool.to_openai_tool() for tool in self.tools]

def shutdown(self):
"""Shutdown the client."""
if self._loop and self._thread:
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join(timeout=5)

if self._transport and not self._transport._closed:
async def _close():
await self._transport.close()

if self._loop:
asyncio.run_coroutine_threadsafe(_close(), self._loop)
52 changes: 52 additions & 0 deletions src/praisonai-ts/examples/tools/mcp-transport-selection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Agent, MCP, TransportType } from 'praisonai-ts';

async function main() {
// Example 1: Automatic transport detection (default behavior)
const mcpAuto = new MCP('http://127.0.0.1:8080/sse'); // Will use SSE
await mcpAuto.initialize();
console.log(`Auto-detected transport: ${mcpAuto.transportType}`);

// Example 2: Explicit SSE transport
const mcpSSE = new MCP('http://127.0.0.1:8080/api', 'sse');
await mcpSSE.initialize();
console.log(`Explicit SSE transport: ${mcpSSE.transportType}`);

// Example 3: Explicit HTTP-Streaming transport
const mcpHTTP = new MCP('http://127.0.0.1:8080/stream', 'http-streaming');
await mcpHTTP.initialize();
console.log(`Explicit HTTP-Streaming transport: ${mcpHTTP.transportType}`);

// Example 4: Auto-detection with non-SSE URL
const mcpAutoHTTP = new MCP('http://127.0.0.1:8080/api'); // Will use HTTP-Streaming
await mcpAutoHTTP.initialize();
console.log(`Auto-detected transport for non-SSE URL: ${mcpAutoHTTP.transportType}`);

// Create tool execution functions
const toolFunctions = Object.fromEntries(
[...mcpAuto].map(tool => [
tool.name,
async (args: any) => tool.execute(args)
])
);

// Create agent with MCP tools
const agent = new Agent({
instructions: 'You are a helpful assistant with access to MCP tools.',
name: 'MCPTransportAgent',
tools: mcpAuto.toOpenAITools(),
toolFunctions
});

// Use the agent
const response = await agent.runSync('What tools are available?');
console.log('Agent response:', response);

// Cleanup
await mcpAuto.close();
await mcpSSE.close();
await mcpHTTP.close();
await mcpAutoHTTP.close();
}

// Run the example
main().catch(console.error);
4 changes: 3 additions & 1 deletion src/praisonai-ts/src/tools/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ export class BaseTool implements Tool {

// Export all tool modules
export * from './arxivTools';
export * from './mcpSse';
export * from './mcp';
// Keep mcpSse export for backward compatibility
export { MCP as MCPSSE } from './mcpSse';
Loading
Loading