Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion src/uipath_mcp/_cli/_runtime/_context.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,47 @@
from typing import Optional
from enum import Enum

from uipath._cli._runtime._contracts import UiPathRuntimeContext

from .._utils._config import McpConfig


class UiPathMcpRuntimeContext(UiPathRuntimeContext):
"""Context information passed throughout the runtime execution."""
config: Optional[McpConfig] = None

class UiPathServerType(Enum):
"""Defines the different types of UiPath servers used in the MCP ecosystem.

This enum is used to identify and configure the behavior of different server types
during runtime registration and execution. Using these enum values instead of
magic numbers improves code readability and maintainability.

Attributes:
UiPath (0): Standard UiPath server for Processes, Agents, and Activities
External (1): External server types like npx, uvx
Local (2): Local MCP server (PackageType.MCPServer)
Hosted (3): Tunnel to externally hosted server
"""
UiPath = 0 # type: int - Processes, Agents, Activities
External = 1 # type: int - npx, uvx
Local = 2 # type: int - PackageType.MCPServer
Hosted = 3 # type: int - tunnel to externally hosted server

@classmethod
def from_string(cls, name: str) -> 'UiPathServerType':
"""Get enum value from string name."""
try:
return cls[name]
except KeyError:
raise ValueError(f"Unknown server type: {name}")

@classmethod
def get_description(cls, server_type: 'UiPathServerType') -> str:
"""Get description for a server type."""
descriptions = {
cls.UiPath: "Standard UiPath server for Processes, Agents, and Activities",
cls.External: "External server types like npx, uvx",
cls.Local: "Local MCP server (PackageType.MCPServer)",
cls.Hosted: "Tunnel to externally hosted server"
}
return descriptions.get(server_type, "Unknown server type")
55 changes: 25 additions & 30 deletions src/uipath_mcp/_cli/_runtime/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@

import mcp.types as types
from mcp import ClientSession, StdioServerParameters
from opentelemetry import trace
from opentelmetry import trace
from pysignalr.client import CompletionMessage, SignalRClient
from uipath import UiPath
from uipath._cli._runtime._contracts import (
UiPathBaseRuntime,
UiPathErrorCategory,
UiPathRuntimeResult,
)
from uipath.tracing import wait_for_tracers
from uipath._cli._runtime.tracing import wait_for_tracers

from .._utils._config import McpServer
from ._context import UiPathMcpRuntimeContext
from ._context import UiPathMcpRuntimeContext, UiPathServerType
from ._exception import UiPathMcpRuntimeError
from ._session import SessionServer
from ._stdio_client import stdio_client
Expand Down Expand Up @@ -61,7 +61,7 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
if self._server is None:
return None

# Set up SignalR client
# Set up SignalR Client
signalr_url = f"{os.environ.get('UIPATH_URL')}/mcp_/wsstunnel?slug={self._server.name}&runtimeId={self._runtime_id}"

with tracer.start_as_current_span(self._server.name) as root_span:
Expand Down Expand Up @@ -138,7 +138,6 @@ async def validate(self) -> None:

async def cleanup(self) -> None:
"""Clean up all resources."""

await self._on_runtime_abort()

if self._keep_alive_task:
Expand Down Expand Up @@ -191,9 +190,9 @@ async def _handle_signalr_session_closed(self, args: list) -> None:
logger.info(
f"Session {session_id} output: {session_server.output}"
)
# If this is a sandboxed runtime for a specific session, cancel the execution
if self.sandboxed:
self._cancel_event.set()
# If this is a sandboxed runtime for a specific session, cancel the execution
if self.sandboxed:
self._cancel_event.set()

except Exception as e:
logger.error(f"Error terminating session {session_id}: {str(e)}")
Expand Down Expand Up @@ -269,24 +268,21 @@ async def _register(self) -> None:
read,
write,
):
async with ClientSession(read, write) as session:
logger.info("Initializing client session...")
# Try to initialize with timeout
try:
await asyncio.wait_for(session.initialize(), timeout=30)
initialization_successful = True
logger.info("Initialization successful")

# Only proceed if initialization was successful
tools_result = await session.list_tools()
logger.info(tools_result)
except asyncio.TimeoutError:
logger.error("Initialization timed out")
# Capture stderr output here, after the timeout
stderr_temp.seek(0)
server_stderr_output = stderr_temp.read().decode('utf-8', errors='replace')
# We'll handle this after exiting the context managers
# We don't continue with registration here - we'll do it after the context managers
logger.info("Initializing client session...")
# Try to initialize with timeout
try:
await asyncio.wait_for(read.initialize(), timeout=30)
initialization_successful = True
logger.info("Initialization successful")
# Only proceed if initialization was successful
tools_result = await read.list_tools()
logger.info(tools_result)
except asyncio.TimeoutError:
logger.error("Initialization timed out")
# Capture stderr output here, after the timeout
stderr_temp.seek(0)
server_stderr_output = stderr_temp.read().decode('utf-8', errors='replace')
# We'll handle this after exiting the context managers

except BaseException as e:
logger.error(f"Error during server initialization: {e}")
Expand All @@ -313,7 +309,7 @@ async def _register(self) -> None:
"Name": self._server.name,
"Slug": self._server.name,
"Version": "1.0.0",
"Type": 1 if self.sandboxed else 3,
"Type": UiPathServerType.External.value if self.sandboxed else UiPathServerType.Hosted.value,
},
"tools": [],
}
Expand Down Expand Up @@ -346,7 +342,7 @@ async def _register(self) -> None:
async def _on_session_start_error(self, session_id: str) -> None:
"""
Sends a dummy initialization failure message to abort the already connected client.
Sanboxed runtimes are triggered by new client connections.
Sandboxed runtimes are triggered by new client connections.
"""
try:
response = await self._uipath.api_client.request_async(
Expand Down Expand Up @@ -402,7 +398,6 @@ async def on_keep_alive_response(response: CompletionMessage) -> None:
logger.error(f"Error during keep-alive: {e}")
await asyncio.sleep(60)


async def _on_runtime_abort(self) -> None:
"""
Sends a runtime abort signalr to terminate all connected sessions.
Expand All @@ -418,7 +413,7 @@ async def _on_runtime_abort(self) -> None:
)
else:
logger.error(
f"Error sending runtime abort signalr to UiPath MCP Server: {response.status_code} - {response.text}"
f"Error sending runtime abort signal to UiPath MCP Server: {response.status_code} - {response.text}"
)
except Exception as e:
logger.error(
Expand Down