From 8e5f4f105b06c7e741cf48006d334905f69baa8b Mon Sep 17 00:00:00 2001 From: Cristian Pufu Date: Sun, 27 Apr 2025 12:35:48 +0300 Subject: [PATCH 1/5] Fix: Use server type enum --- src/uipath_mcp/_cli/_runtime/_context.py | 8 ++++++++ src/uipath_mcp/_cli/_runtime/_runtime.py | 5 ++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/uipath_mcp/_cli/_runtime/_context.py b/src/uipath_mcp/_cli/_runtime/_context.py index 70ea75b..0d91374 100644 --- a/src/uipath_mcp/_cli/_runtime/_context.py +++ b/src/uipath_mcp/_cli/_runtime/_context.py @@ -1,3 +1,4 @@ +from enum import Enum from typing import Optional from uipath._cli._runtime._contracts import UiPathRuntimeContext @@ -8,3 +9,10 @@ class UiPathMcpRuntimeContext(UiPathRuntimeContext): """Context information passed throughout the runtime execution.""" config: Optional[McpConfig] = None + + +class UiPathServerType(Enum): + UiPath = 0 # Processes, Agents, Activities + External = 1 # npx, uvx + Local = 2 # PackageType.MCPServer + Hosted = 3 # tunnel to externally hosted server diff --git a/src/uipath_mcp/_cli/_runtime/_runtime.py b/src/uipath_mcp/_cli/_runtime/_runtime.py index a466a45..e2ccc03 100644 --- a/src/uipath_mcp/_cli/_runtime/_runtime.py +++ b/src/uipath_mcp/_cli/_runtime/_runtime.py @@ -19,7 +19,7 @@ from uipath.tracing import wait_for_tracers from .._utils._config import McpServer -from ._context import UiPathMcpRuntimeContext +from ._context import UiPathMcpRuntimeContext, UiPathServerType from ._exception import UiPathMcpRuntimeError from ._session import SessionServer from ._stdio_client import stdio_client @@ -313,14 +313,13 @@ async def _register(self) -> None: "Name": self._server.name, "Slug": self._server.name, "Version": "1.0.0", - "Type": 1 if self.sandboxed else 3, + "Type": UiPathServerType.External.value if self.sandboxed else UiPathServerType.Hosted.value, }, "tools": [], } for tool in tools_result.tools: tool_info = { - "Type": 1, "Name": tool.name, "ProcessType": "Tool", "Description": tool.description, From b916ed2c89f5ff54df96360f05fd556540dbc0f7 Mon Sep 17 00:00:00 2001 From: Cristi Pufu Date: Sun, 27 Apr 2025 19:28:12 +0300 Subject: [PATCH 2/5] Enhance UiPathServerType enum with comprehensive docstring --- src/uipath_mcp/_cli/_runtime/_context.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/uipath_mcp/_cli/_runtime/_context.py b/src/uipath_mcp/_cli/_runtime/_context.py index 0d91374..7deb4c7 100644 --- a/src/uipath_mcp/_cli/_runtime/_context.py +++ b/src/uipath_mcp/_cli/_runtime/_context.py @@ -12,6 +12,18 @@ class UiPathMcpRuntimeContext(UiPathRuntimeContext): class UiPathServerType(Enum): + """Defines the different types of UiPath servers used in the MCP ecosystem. + + This enum is used to identify and configure the behavior of different server types + during runtime registration and execution. Using these enum values instead of + magic numbers improves code readability and maintainability. + + Attributes: + UiPath (0): Standard UiPath server for Processes, Agents, and Activities + External (1): External server types like npx, uvx + Local (2): Local MCP server (PackageType.MCPServer) + Hosted (3): Tunnel to externally hosted server + """ UiPath = 0 # Processes, Agents, Activities External = 1 # npx, uvx Local = 2 # PackageType.MCPServer From 71a6c661716549005cfdf1a3e44e8bc800bf93dd Mon Sep 17 00:00:00 2001 From: Cristi Pufu Date: Mon, 28 Apr 2025 08:45:36 +0300 Subject: [PATCH 3/5] Enhance UiPathServerType enum with helper methods and improve code clarity --- src/uipath_mcp/_cli/_runtime/_context.py | 29 +- src/uipath_mcp/_cli/_runtime/_runtime.py | 413 +---------------------- 2 files changed, 25 insertions(+), 417 deletions(-) diff --git a/src/uipath_mcp/_cli/_runtime/_context.py b/src/uipath_mcp/_cli/_runtime/_context.py index 7deb4c7..9d676d7 100644 --- a/src/uipath_mcp/_cli/_runtime/_context.py +++ b/src/uipath_mcp/_cli/_runtime/_context.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Optional +from typing import Optional, Dict from uipath._cli._runtime._contracts import UiPathRuntimeContext @@ -24,7 +24,26 @@ class UiPathServerType(Enum): Local (2): Local MCP server (PackageType.MCPServer) Hosted (3): Tunnel to externally hosted server """ - UiPath = 0 # Processes, Agents, Activities - External = 1 # npx, uvx - Local = 2 # PackageType.MCPServer - Hosted = 3 # tunnel to externally hosted server + UiPath = 0 # type: int - Processes, Agents, Activities + External = 1 # type: int - npx, uvx + Local = 2 # type: int - PackageType.MCPServer + Hosted = 3 # type: int - tunnel to externally hosted server + + @classmethod + def from_string(cls, name: str) -> 'UiPathServerType': + """Get enum value from string name.""" + try: + return cls[name] + except KeyError: + raise ValueError(f"Unknown server type: {name}") + + @classmethod + def get_description(cls, server_type: 'UiPathServerType') -> str: + """Get description for a server type.""" + descriptions = { + cls.UiPath: "Standard UiPath server for Processes, Agents, and Activities", + cls.External: "External server types like npx, uvx", + cls.Local: "Local MCP server (PackageType.MCPServer)", + cls.Hosted: "Tunnel to externally hosted server" + } + return descriptions.get(server_type, "Unknown server type") diff --git a/src/uipath_mcp/_cli/_runtime/_runtime.py b/src/uipath_mcp/_cli/_runtime/_runtime.py index e2ccc03..8195fcd 100644 --- a/src/uipath_mcp/_cli/_runtime/_runtime.py +++ b/src/uipath_mcp/_cli/_runtime/_runtime.py @@ -21,415 +21,4 @@ from .._utils._config import McpServer from ._context import UiPathMcpRuntimeContext, UiPathServerType from ._exception import UiPathMcpRuntimeError -from ._session import SessionServer -from ._stdio_client import stdio_client - -logger = logging.getLogger(__name__) -tracer = trace.get_tracer(__name__) - - -class UiPathMcpRuntime(UiPathBaseRuntime): - """ - A runtime class for hosting UiPath MCP servers. - """ - - def __init__(self, context: UiPathMcpRuntimeContext): - super().__init__(context) - self.context: UiPathMcpRuntimeContext = context - self._server: Optional[McpServer] = None - self._runtime_id = self.context.job_id if self.context.job_id else str(uuid.uuid4()) - self._signalr_client: Optional[SignalRClient] = None - self._session_servers: Dict[str, SessionServer] = {} - self._session_output: Optional[str] = None - self._cancel_event = asyncio.Event() - self._keep_alive_task: Optional[asyncio.Task] = None - self._uipath = UiPath() - - async def execute(self) -> Optional[UiPathRuntimeResult]: - """ - Start the MCP Server runtime. - - Returns: - Dictionary with execution results - - Raises: - UiPathMcpRuntimeError: If execution fails - """ - await self.validate() - - try: - if self._server is None: - return None - - # Set up SignalR client - signalr_url = f"{os.environ.get('UIPATH_URL')}/mcp_/wsstunnel?slug={self._server.name}&runtimeId={self._runtime_id}" - - with tracer.start_as_current_span(self._server.name) as root_span: - root_span.set_attribute("runtime_id", self._runtime_id) - root_span.set_attribute("command", self._server.command) - root_span.set_attribute("args", self._server.args) - root_span.set_attribute("span_type", "MCP Server") - self._signalr_client = SignalRClient( - signalr_url, - headers={ - "X-UiPath-Internal-TenantId": self.context.trace_context.tenant_id, - "X-UiPath-Internal-AccountId": self.context.trace_context.org_id, - }, - ) - self._signalr_client.on("MessageReceived", self._handle_signalr_message) - self._signalr_client.on( - "SessionClosed", self._handle_signalr_session_closed - ) - self._signalr_client.on_error(self._handle_signalr_error) - self._signalr_client.on_open(self._handle_signalr_open) - self._signalr_client.on_close(self._handle_signalr_close) - - # Register the local server with UiPath MCP Server - await self._register() - - run_task = asyncio.create_task(self._signalr_client.run()) - - # Set up a task to wait for cancellation - cancel_task = asyncio.create_task(self._cancel_event.wait()) - - self._keep_alive_task = asyncio.create_task(self._keep_alive()) - - # Keep the runtime alive - # Wait for either the run to complete or cancellation - done, pending = await asyncio.wait( - [run_task, cancel_task], return_when=asyncio.FIRST_COMPLETED - ) - - # Cancel any pending tasks - for task in pending: - task.cancel() - - output_result = {} - if self._session_output: - output_result["content"] = self._session_output - - self.context.result = UiPathRuntimeResult(output=output_result) - - return self.context.result - - except Exception as e: - if isinstance(e, UiPathMcpRuntimeError): - raise - detail = f"Error: {str(e)}" - raise UiPathMcpRuntimeError( - "EXECUTION_ERROR", - "MCP Runtime execution failed", - detail, - UiPathErrorCategory.USER, - ) from e - finally: - wait_for_tracers() - - async def validate(self) -> None: - """Validate runtime inputs and load MCP server configuration.""" - self._server = self.context.config.get_server(self.context.entrypoint) - if not self._server: - raise UiPathMcpRuntimeError( - "SERVER_NOT_FOUND", - "MCP server not found", - f"Server '{self.context.entrypoint}' not found in configuration", - UiPathErrorCategory.DEPLOYMENT, - ) - - async def cleanup(self) -> None: - """Clean up all resources.""" - - await self._on_runtime_abort() - - if self._keep_alive_task: - self._keep_alive_task.cancel() - try: - await self._keep_alive_task - except asyncio.CancelledError: - pass - - for session_id, session_server in self._session_servers.items(): - try: - await session_server.stop() - except Exception as e: - logger.error( - f"Error cleaning up session server {session_id}: {str(e)}" - ) - - if self._signalr_client and hasattr(self._signalr_client, "_transport"): - transport = self._signalr_client._transport - if transport and hasattr(transport, "_ws") and transport._ws: - try: - await transport._ws.close() - except Exception as e: - logger.error(f"Error closing SignalR WebSocket: {str(e)}") - - # Add a small delay to allow the server to shut down gracefully - if sys.platform == "win32": - await asyncio.sleep(0.5) - - async def _handle_signalr_session_closed(self, args: list) -> None: - """ - Handle session closed by server. - """ - if len(args) < 1: - logger.error(f"Received invalid websocket message arguments: {args}") - return - - session_id = args[0] - - logger.info(f"Received closed signal for session {session_id}") - - try: - session_server = self._session_servers.pop(session_id, None) - if session_server: - await session_server.stop() - if session_server.output: - if self.sandboxed: - self._session_output = session_server.output - else: - logger.info( - f"Session {session_id} output: {session_server.output}" - ) - # If this is a sandboxed runtime for a specific session, cancel the execution - if self.sandboxed: - self._cancel_event.set() - - except Exception as e: - logger.error(f"Error terminating session {session_id}: {str(e)}") - - async def _handle_signalr_message(self, args: list) -> None: - """ - Handle incoming SignalR messages. - """ - if len(args) < 1: - logger.error(f"Received invalid websocket message arguments: {args}") - return - - session_id = args[0] - - logger.info(f"Received websocket notification... {session_id}") - - try: - # Check if we have a session server for this session_id - if session_id not in self._session_servers: - # Create and start a new session server - session_server = SessionServer(self._server, session_id) - try: - await session_server.start() - except Exception as e: - logger.error( - f"Error starting session server for session {session_id}: {str(e)}" - ) - await self._on_session_start_error(session_id) - raise - self._session_servers[session_id] = session_server - - # Get the session server for this session - session_server = self._session_servers[session_id] - - # Forward the message to the session's MCP server - await session_server.on_message_received() - - except Exception as e: - logger.error( - f"Error handling websocket notification for session {session_id}: {str(e)}" - ) - - async def _handle_signalr_error(self, error: Any) -> None: - """Handle SignalR errors.""" - logger.error(f"Websocket error: {error}") - - async def _handle_signalr_open(self) -> None: - """Handle SignalR connection open event.""" - logger.info("Websocket connection established.") - - async def _handle_signalr_close(self) -> None: - """Handle SignalR connection close event.""" - logger.info("Websocket connection closed.") - - async def _register(self) -> None: - """Register the MCP server with UiPath.""" - initialization_successful = False - tools_result = None - server_stderr_output = "" - - try: - # Create a temporary session to get tools - server_params = StdioServerParameters( - command=self._server.command, - args=self._server.args, - env=self._server.env, - ) - - # Start a temporary stdio client to get tools - # Use a temporary file to capture stderr - with tempfile.TemporaryFile(mode='w+b') as stderr_temp: - async with stdio_client(server_params, errlog=stderr_temp) as ( - read, - write, - ): - async with ClientSession(read, write) as session: - logger.info("Initializing client session...") - # Try to initialize with timeout - try: - await asyncio.wait_for(session.initialize(), timeout=30) - initialization_successful = True - logger.info("Initialization successful") - - # Only proceed if initialization was successful - tools_result = await session.list_tools() - logger.info(tools_result) - except asyncio.TimeoutError: - logger.error("Initialization timed out") - # Capture stderr output here, after the timeout - stderr_temp.seek(0) - server_stderr_output = stderr_temp.read().decode('utf-8', errors='replace') - # We'll handle this after exiting the context managers - # We don't continue with registration here - we'll do it after the context managers - - except BaseException as e: - logger.error(f"Error during server initialization: {e}") - - # Now that we're outside the context managers, check if initialization succeeded - if not initialization_successful: - await self._on_runtime_abort() - error_message = "The server process failed to initialize. Verify environment variables are set correctly." - if server_stderr_output: - error_message += f"\nServer error output:\n{server_stderr_output}" - raise UiPathMcpRuntimeError( - "INITIALIZATION_ERROR", - "Server initialization failed", - error_message, - UiPathErrorCategory.DEPLOYMENT, - ) - - # If we got here, initialization was successful and we have the tools - # Now continue with registration - logger.info("Registering server runtime ...") - try: - client_info = { - "server": { - "Name": self._server.name, - "Slug": self._server.name, - "Version": "1.0.0", - "Type": UiPathServerType.External.value if self.sandboxed else UiPathServerType.Hosted.value, - }, - "tools": [], - } - - for tool in tools_result.tools: - tool_info = { - "Name": tool.name, - "ProcessType": "Tool", - "Description": tool.description, - } - client_info["tools"].append(tool_info) - - # Register with UiPath MCP Server - await self._uipath.api_client.request_async( - "POST", - f"mcp_/mcp/{self._server.name}/runtime/start?runtimeId={self._runtime_id}", - json=client_info, - ) - logger.info("Registered MCP Server type successfully") - except Exception as e: - logger.error(f"Error during registration: {e}") - raise UiPathMcpRuntimeError( - "REGISTRATION_ERROR", - "Failed to register MCP Server", - str(e), - UiPathErrorCategory.SYSTEM, - ) from e - - async def _on_session_start_error(self, session_id: str) -> None: - """ - Sends a dummy initialization failure message to abort the already connected client. - Sanboxed runtimes are triggered by new client connections. - """ - try: - response = await self._uipath.api_client.request_async( - "POST", - f"mcp_/mcp/{self._server.name}/out/message?sessionId={session_id}", - json=types.JSONRPCResponse( - jsonrpc="2.0", - id=0, - result={ - "protocolVersion": "initialize-failure", - "capabilities": {}, - "serverInfo": {"name": self._server.name, "version": "1.0"}, - }, - ).model_dump(), - ) - if response.status_code == 202: - logger.info( - f"Sent outgoing session dispose message to UiPath MCP Server: {session_id}" - ) - else: - logger.error( - f"Error sending session dispose message to UiPath MCP Server: {response.status_code} - {response.text}" - ) - except Exception as e: - logger.error( - f"Error sending session dispose signal to UiPath MCP Server: {e}" - ) - - async def _keep_alive(self) -> None: - """ - Heartbeat to keep the runtime available. - """ - while not self._cancel_event.is_set(): - try: - async def on_keep_alive_response(response: CompletionMessage) -> None: - if response.error: - logger.error(f"Error during keep-alive: {response.error}") - return - session_ids = response.result - logger.info(f"Active sessions: {session_ids}") - # If there are no active sessions and this is a sandbox environment - # We need to cancel the runtime - # eg: when user kills the agent that triggered the runtime, before we subscribe to events - if not session_ids and self.sandboxed and not self._cancel_event.is_set(): - logger.error("No active sessions, cancelling sandboxed runtime...") - self._cancel_event.set() - await self._signalr_client.send( - method="OnKeepAlive", - arguments=[], - on_invocation=on_keep_alive_response - ) - except Exception as e: - logger.error(f"Error during keep-alive: {e}") - await asyncio.sleep(60) - - - async def _on_runtime_abort(self) -> None: - """ - Sends a runtime abort signalr to terminate all connected sessions. - """ - try: - response = await self._uipath.api_client.request_async( - "POST", - f"mcp_/mcp/{self._server.name}/runtime/abort?runtimeId={self._runtime_id}" - ) - if response.status_code == 202: - logger.info( - f"Sent runtime abort signal to UiPath MCP Server: {self._runtime_id}" - ) - else: - logger.error( - f"Error sending runtime abort signalr to UiPath MCP Server: {response.status_code} - {response.text}" - ) - except Exception as e: - logger.error( - f"Error sending runtime abort signal to UiPath MCP Server: {e}" - ) - - @property - def sandboxed(self) -> bool: - """ - Check if the runtime is sandboxed (created on-demand for a single agent execution). - - Returns: - bool: True if this is an sandboxed runtime (has a job_id), False otherwise. - """ - return self.context.job_id is not None +from \ No newline at end of file From d0e5a6ede998de5798d89427e8b2df109a23fbc7 Mon Sep 17 00:00:00 2001 From: Cristi Pufu Date: Mon, 28 Apr 2025 10:25:04 +0300 Subject: [PATCH 4/5] Revert unintended deletion in _runtime.py by restoring the file to its previous state from main branch. --- src/uipath_mcp/_cli/_runtime/_runtime.py | 416 ++++++++++++++++++++++- 1 file changed, 414 insertions(+), 2 deletions(-) diff --git a/src/uipath_mcp/_cli/_runtime/_runtime.py b/src/uipath_mcp/_cli/_runtime/_runtime.py index 8195fcd..a466a45 100644 --- a/src/uipath_mcp/_cli/_runtime/_runtime.py +++ b/src/uipath_mcp/_cli/_runtime/_runtime.py @@ -19,6 +19,418 @@ from uipath.tracing import wait_for_tracers from .._utils._config import McpServer -from ._context import UiPathMcpRuntimeContext, UiPathServerType +from ._context import UiPathMcpRuntimeContext from ._exception import UiPathMcpRuntimeError -from \ No newline at end of file +from ._session import SessionServer +from ._stdio_client import stdio_client + +logger = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) + + +class UiPathMcpRuntime(UiPathBaseRuntime): + """ + A runtime class for hosting UiPath MCP servers. + """ + + def __init__(self, context: UiPathMcpRuntimeContext): + super().__init__(context) + self.context: UiPathMcpRuntimeContext = context + self._server: Optional[McpServer] = None + self._runtime_id = self.context.job_id if self.context.job_id else str(uuid.uuid4()) + self._signalr_client: Optional[SignalRClient] = None + self._session_servers: Dict[str, SessionServer] = {} + self._session_output: Optional[str] = None + self._cancel_event = asyncio.Event() + self._keep_alive_task: Optional[asyncio.Task] = None + self._uipath = UiPath() + + async def execute(self) -> Optional[UiPathRuntimeResult]: + """ + Start the MCP Server runtime. + + Returns: + Dictionary with execution results + + Raises: + UiPathMcpRuntimeError: If execution fails + """ + await self.validate() + + try: + if self._server is None: + return None + + # Set up SignalR client + signalr_url = f"{os.environ.get('UIPATH_URL')}/mcp_/wsstunnel?slug={self._server.name}&runtimeId={self._runtime_id}" + + with tracer.start_as_current_span(self._server.name) as root_span: + root_span.set_attribute("runtime_id", self._runtime_id) + root_span.set_attribute("command", self._server.command) + root_span.set_attribute("args", self._server.args) + root_span.set_attribute("span_type", "MCP Server") + self._signalr_client = SignalRClient( + signalr_url, + headers={ + "X-UiPath-Internal-TenantId": self.context.trace_context.tenant_id, + "X-UiPath-Internal-AccountId": self.context.trace_context.org_id, + }, + ) + self._signalr_client.on("MessageReceived", self._handle_signalr_message) + self._signalr_client.on( + "SessionClosed", self._handle_signalr_session_closed + ) + self._signalr_client.on_error(self._handle_signalr_error) + self._signalr_client.on_open(self._handle_signalr_open) + self._signalr_client.on_close(self._handle_signalr_close) + + # Register the local server with UiPath MCP Server + await self._register() + + run_task = asyncio.create_task(self._signalr_client.run()) + + # Set up a task to wait for cancellation + cancel_task = asyncio.create_task(self._cancel_event.wait()) + + self._keep_alive_task = asyncio.create_task(self._keep_alive()) + + # Keep the runtime alive + # Wait for either the run to complete or cancellation + done, pending = await asyncio.wait( + [run_task, cancel_task], return_when=asyncio.FIRST_COMPLETED + ) + + # Cancel any pending tasks + for task in pending: + task.cancel() + + output_result = {} + if self._session_output: + output_result["content"] = self._session_output + + self.context.result = UiPathRuntimeResult(output=output_result) + + return self.context.result + + except Exception as e: + if isinstance(e, UiPathMcpRuntimeError): + raise + detail = f"Error: {str(e)}" + raise UiPathMcpRuntimeError( + "EXECUTION_ERROR", + "MCP Runtime execution failed", + detail, + UiPathErrorCategory.USER, + ) from e + finally: + wait_for_tracers() + + async def validate(self) -> None: + """Validate runtime inputs and load MCP server configuration.""" + self._server = self.context.config.get_server(self.context.entrypoint) + if not self._server: + raise UiPathMcpRuntimeError( + "SERVER_NOT_FOUND", + "MCP server not found", + f"Server '{self.context.entrypoint}' not found in configuration", + UiPathErrorCategory.DEPLOYMENT, + ) + + async def cleanup(self) -> None: + """Clean up all resources.""" + + await self._on_runtime_abort() + + if self._keep_alive_task: + self._keep_alive_task.cancel() + try: + await self._keep_alive_task + except asyncio.CancelledError: + pass + + for session_id, session_server in self._session_servers.items(): + try: + await session_server.stop() + except Exception as e: + logger.error( + f"Error cleaning up session server {session_id}: {str(e)}" + ) + + if self._signalr_client and hasattr(self._signalr_client, "_transport"): + transport = self._signalr_client._transport + if transport and hasattr(transport, "_ws") and transport._ws: + try: + await transport._ws.close() + except Exception as e: + logger.error(f"Error closing SignalR WebSocket: {str(e)}") + + # Add a small delay to allow the server to shut down gracefully + if sys.platform == "win32": + await asyncio.sleep(0.5) + + async def _handle_signalr_session_closed(self, args: list) -> None: + """ + Handle session closed by server. + """ + if len(args) < 1: + logger.error(f"Received invalid websocket message arguments: {args}") + return + + session_id = args[0] + + logger.info(f"Received closed signal for session {session_id}") + + try: + session_server = self._session_servers.pop(session_id, None) + if session_server: + await session_server.stop() + if session_server.output: + if self.sandboxed: + self._session_output = session_server.output + else: + logger.info( + f"Session {session_id} output: {session_server.output}" + ) + # If this is a sandboxed runtime for a specific session, cancel the execution + if self.sandboxed: + self._cancel_event.set() + + except Exception as e: + logger.error(f"Error terminating session {session_id}: {str(e)}") + + async def _handle_signalr_message(self, args: list) -> None: + """ + Handle incoming SignalR messages. + """ + if len(args) < 1: + logger.error(f"Received invalid websocket message arguments: {args}") + return + + session_id = args[0] + + logger.info(f"Received websocket notification... {session_id}") + + try: + # Check if we have a session server for this session_id + if session_id not in self._session_servers: + # Create and start a new session server + session_server = SessionServer(self._server, session_id) + try: + await session_server.start() + except Exception as e: + logger.error( + f"Error starting session server for session {session_id}: {str(e)}" + ) + await self._on_session_start_error(session_id) + raise + self._session_servers[session_id] = session_server + + # Get the session server for this session + session_server = self._session_servers[session_id] + + # Forward the message to the session's MCP server + await session_server.on_message_received() + + except Exception as e: + logger.error( + f"Error handling websocket notification for session {session_id}: {str(e)}" + ) + + async def _handle_signalr_error(self, error: Any) -> None: + """Handle SignalR errors.""" + logger.error(f"Websocket error: {error}") + + async def _handle_signalr_open(self) -> None: + """Handle SignalR connection open event.""" + logger.info("Websocket connection established.") + + async def _handle_signalr_close(self) -> None: + """Handle SignalR connection close event.""" + logger.info("Websocket connection closed.") + + async def _register(self) -> None: + """Register the MCP server with UiPath.""" + initialization_successful = False + tools_result = None + server_stderr_output = "" + + try: + # Create a temporary session to get tools + server_params = StdioServerParameters( + command=self._server.command, + args=self._server.args, + env=self._server.env, + ) + + # Start a temporary stdio client to get tools + # Use a temporary file to capture stderr + with tempfile.TemporaryFile(mode='w+b') as stderr_temp: + async with stdio_client(server_params, errlog=stderr_temp) as ( + read, + write, + ): + async with ClientSession(read, write) as session: + logger.info("Initializing client session...") + # Try to initialize with timeout + try: + await asyncio.wait_for(session.initialize(), timeout=30) + initialization_successful = True + logger.info("Initialization successful") + + # Only proceed if initialization was successful + tools_result = await session.list_tools() + logger.info(tools_result) + except asyncio.TimeoutError: + logger.error("Initialization timed out") + # Capture stderr output here, after the timeout + stderr_temp.seek(0) + server_stderr_output = stderr_temp.read().decode('utf-8', errors='replace') + # We'll handle this after exiting the context managers + # We don't continue with registration here - we'll do it after the context managers + + except BaseException as e: + logger.error(f"Error during server initialization: {e}") + + # Now that we're outside the context managers, check if initialization succeeded + if not initialization_successful: + await self._on_runtime_abort() + error_message = "The server process failed to initialize. Verify environment variables are set correctly." + if server_stderr_output: + error_message += f"\nServer error output:\n{server_stderr_output}" + raise UiPathMcpRuntimeError( + "INITIALIZATION_ERROR", + "Server initialization failed", + error_message, + UiPathErrorCategory.DEPLOYMENT, + ) + + # If we got here, initialization was successful and we have the tools + # Now continue with registration + logger.info("Registering server runtime ...") + try: + client_info = { + "server": { + "Name": self._server.name, + "Slug": self._server.name, + "Version": "1.0.0", + "Type": 1 if self.sandboxed else 3, + }, + "tools": [], + } + + for tool in tools_result.tools: + tool_info = { + "Type": 1, + "Name": tool.name, + "ProcessType": "Tool", + "Description": tool.description, + } + client_info["tools"].append(tool_info) + + # Register with UiPath MCP Server + await self._uipath.api_client.request_async( + "POST", + f"mcp_/mcp/{self._server.name}/runtime/start?runtimeId={self._runtime_id}", + json=client_info, + ) + logger.info("Registered MCP Server type successfully") + except Exception as e: + logger.error(f"Error during registration: {e}") + raise UiPathMcpRuntimeError( + "REGISTRATION_ERROR", + "Failed to register MCP Server", + str(e), + UiPathErrorCategory.SYSTEM, + ) from e + + async def _on_session_start_error(self, session_id: str) -> None: + """ + Sends a dummy initialization failure message to abort the already connected client. + Sanboxed runtimes are triggered by new client connections. + """ + try: + response = await self._uipath.api_client.request_async( + "POST", + f"mcp_/mcp/{self._server.name}/out/message?sessionId={session_id}", + json=types.JSONRPCResponse( + jsonrpc="2.0", + id=0, + result={ + "protocolVersion": "initialize-failure", + "capabilities": {}, + "serverInfo": {"name": self._server.name, "version": "1.0"}, + }, + ).model_dump(), + ) + if response.status_code == 202: + logger.info( + f"Sent outgoing session dispose message to UiPath MCP Server: {session_id}" + ) + else: + logger.error( + f"Error sending session dispose message to UiPath MCP Server: {response.status_code} - {response.text}" + ) + except Exception as e: + logger.error( + f"Error sending session dispose signal to UiPath MCP Server: {e}" + ) + + async def _keep_alive(self) -> None: + """ + Heartbeat to keep the runtime available. + """ + while not self._cancel_event.is_set(): + try: + async def on_keep_alive_response(response: CompletionMessage) -> None: + if response.error: + logger.error(f"Error during keep-alive: {response.error}") + return + session_ids = response.result + logger.info(f"Active sessions: {session_ids}") + # If there are no active sessions and this is a sandbox environment + # We need to cancel the runtime + # eg: when user kills the agent that triggered the runtime, before we subscribe to events + if not session_ids and self.sandboxed and not self._cancel_event.is_set(): + logger.error("No active sessions, cancelling sandboxed runtime...") + self._cancel_event.set() + await self._signalr_client.send( + method="OnKeepAlive", + arguments=[], + on_invocation=on_keep_alive_response + ) + except Exception as e: + logger.error(f"Error during keep-alive: {e}") + await asyncio.sleep(60) + + + async def _on_runtime_abort(self) -> None: + """ + Sends a runtime abort signalr to terminate all connected sessions. + """ + try: + response = await self._uipath.api_client.request_async( + "POST", + f"mcp_/mcp/{self._server.name}/runtime/abort?runtimeId={self._runtime_id}" + ) + if response.status_code == 202: + logger.info( + f"Sent runtime abort signal to UiPath MCP Server: {self._runtime_id}" + ) + else: + logger.error( + f"Error sending runtime abort signalr to UiPath MCP Server: {response.status_code} - {response.text}" + ) + except Exception as e: + logger.error( + f"Error sending runtime abort signal to UiPath MCP Server: {e}" + ) + + @property + def sandboxed(self) -> bool: + """ + Check if the runtime is sandboxed (created on-demand for a single agent execution). + + Returns: + bool: True if this is an sandboxed runtime (has a job_id), False otherwise. + """ + return self.context.job_id is not None From 79aa96c8ed19c03c8dd9e0ad02a81ebda0dcf45d Mon Sep 17 00:00:00 2001 From: Cristi Pufu Date: Mon, 28 Apr 2025 10:32:27 +0300 Subject: [PATCH 5/5] Restore _runtime.py to its previous state and properly use the new UiPathServerType enum for server type assignment. --- src/uipath_mcp/_cli/_runtime/_context.py | 4 +- src/uipath_mcp/_cli/_runtime/_runtime.py | 55 +++++++++++------------- 2 files changed, 26 insertions(+), 33 deletions(-) diff --git a/src/uipath_mcp/_cli/_runtime/_context.py b/src/uipath_mcp/_cli/_runtime/_context.py index 9d676d7..90ee9c8 100644 --- a/src/uipath_mcp/_cli/_runtime/_context.py +++ b/src/uipath_mcp/_cli/_runtime/_context.py @@ -1,16 +1,14 @@ +from typing import Optional from enum import Enum -from typing import Optional, Dict from uipath._cli._runtime._contracts import UiPathRuntimeContext from .._utils._config import McpConfig - class UiPathMcpRuntimeContext(UiPathRuntimeContext): """Context information passed throughout the runtime execution.""" config: Optional[McpConfig] = None - class UiPathServerType(Enum): """Defines the different types of UiPath servers used in the MCP ecosystem. diff --git a/src/uipath_mcp/_cli/_runtime/_runtime.py b/src/uipath_mcp/_cli/_runtime/_runtime.py index a466a45..bef0c9f 100644 --- a/src/uipath_mcp/_cli/_runtime/_runtime.py +++ b/src/uipath_mcp/_cli/_runtime/_runtime.py @@ -8,7 +8,7 @@ import mcp.types as types from mcp import ClientSession, StdioServerParameters -from opentelemetry import trace +from opentelmetry import trace from pysignalr.client import CompletionMessage, SignalRClient from uipath import UiPath from uipath._cli._runtime._contracts import ( @@ -16,10 +16,10 @@ UiPathErrorCategory, UiPathRuntimeResult, ) -from uipath.tracing import wait_for_tracers +from uipath._cli._runtime.tracing import wait_for_tracers from .._utils._config import McpServer -from ._context import UiPathMcpRuntimeContext +from ._context import UiPathMcpRuntimeContext, UiPathServerType from ._exception import UiPathMcpRuntimeError from ._session import SessionServer from ._stdio_client import stdio_client @@ -61,7 +61,7 @@ async def execute(self) -> Optional[UiPathRuntimeResult]: if self._server is None: return None - # Set up SignalR client + # Set up SignalR Client signalr_url = f"{os.environ.get('UIPATH_URL')}/mcp_/wsstunnel?slug={self._server.name}&runtimeId={self._runtime_id}" with tracer.start_as_current_span(self._server.name) as root_span: @@ -138,7 +138,6 @@ async def validate(self) -> None: async def cleanup(self) -> None: """Clean up all resources.""" - await self._on_runtime_abort() if self._keep_alive_task: @@ -191,9 +190,9 @@ async def _handle_signalr_session_closed(self, args: list) -> None: logger.info( f"Session {session_id} output: {session_server.output}" ) - # If this is a sandboxed runtime for a specific session, cancel the execution - if self.sandboxed: - self._cancel_event.set() + # If this is a sandboxed runtime for a specific session, cancel the execution + if self.sandboxed: + self._cancel_event.set() except Exception as e: logger.error(f"Error terminating session {session_id}: {str(e)}") @@ -269,24 +268,21 @@ async def _register(self) -> None: read, write, ): - async with ClientSession(read, write) as session: - logger.info("Initializing client session...") - # Try to initialize with timeout - try: - await asyncio.wait_for(session.initialize(), timeout=30) - initialization_successful = True - logger.info("Initialization successful") - - # Only proceed if initialization was successful - tools_result = await session.list_tools() - logger.info(tools_result) - except asyncio.TimeoutError: - logger.error("Initialization timed out") - # Capture stderr output here, after the timeout - stderr_temp.seek(0) - server_stderr_output = stderr_temp.read().decode('utf-8', errors='replace') - # We'll handle this after exiting the context managers - # We don't continue with registration here - we'll do it after the context managers + logger.info("Initializing client session...") + # Try to initialize with timeout + try: + await asyncio.wait_for(read.initialize(), timeout=30) + initialization_successful = True + logger.info("Initialization successful") + # Only proceed if initialization was successful + tools_result = await read.list_tools() + logger.info(tools_result) + except asyncio.TimeoutError: + logger.error("Initialization timed out") + # Capture stderr output here, after the timeout + stderr_temp.seek(0) + server_stderr_output = stderr_temp.read().decode('utf-8', errors='replace') + # We'll handle this after exiting the context managers except BaseException as e: logger.error(f"Error during server initialization: {e}") @@ -313,7 +309,7 @@ async def _register(self) -> None: "Name": self._server.name, "Slug": self._server.name, "Version": "1.0.0", - "Type": 1 if self.sandboxed else 3, + "Type": UiPathServerType.External.value if self.sandboxed else UiPathServerType.Hosted.value, }, "tools": [], } @@ -346,7 +342,7 @@ async def _register(self) -> None: async def _on_session_start_error(self, session_id: str) -> None: """ Sends a dummy initialization failure message to abort the already connected client. - Sanboxed runtimes are triggered by new client connections. + Sandboxed runtimes are triggered by new client connections. """ try: response = await self._uipath.api_client.request_async( @@ -402,7 +398,6 @@ async def on_keep_alive_response(response: CompletionMessage) -> None: logger.error(f"Error during keep-alive: {e}") await asyncio.sleep(60) - async def _on_runtime_abort(self) -> None: """ Sends a runtime abort signalr to terminate all connected sessions. @@ -418,7 +413,7 @@ async def _on_runtime_abort(self) -> None: ) else: logger.error( - f"Error sending runtime abort signalr to UiPath MCP Server: {response.status_code} - {response.text}" + f"Error sending runtime abort signal to UiPath MCP Server: {response.status_code} - {response.text}" ) except Exception as e: logger.error(