Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-mcp"
version = "0.0.77"
version = "0.0.78"
description = "UiPath MCP SDK"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.10"
Expand Down
40 changes: 40 additions & 0 deletions src/uipath_mcp/_cli/_runtime/_context.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
from typing import Optional

from uipath._cli._runtime._contracts import UiPathRuntimeContext
Expand All @@ -7,4 +8,43 @@

class UiPathMcpRuntimeContext(UiPathRuntimeContext):
"""Context information passed throughout the runtime execution."""

config: Optional[McpConfig] = None


class UiPathServerType(Enum):
"""Defines the different types of UiPath servers used in the MCP ecosystem.

This enum is used to identify and configure the behavior of different server types
during runtime registration and execution.

Attributes:
UiPath (0): Standard UiPath server for Processes, Agents, and Activities
External (1): External server types like npx, uvx
Local (2): Local MCP server (PackageType.MCPServer)
Hosted (3): Tunnel to externally hosted server
"""

UiPath = 0 # type: int # Processes, Agents, Activities
External = 1 # type: int # npx, uvx
Local = 2 # type: int # PackageType.MCPServer
Hosted = 3 # type: int # tunnel to externally hosted server

@classmethod
def from_string(cls, name: str) -> "UiPathServerType":
"""Get enum value from string name."""
try:
return cls[name]
except KeyError as e:
raise ValueError(f"Unknown server type: {name}") from e

@classmethod
def get_description(cls, server_type: "UiPathServerType") -> str:
"""Get description for a server type."""
descriptions = {
cls.UiPath: "Standard UiPath server for Processes, Agents, and Activities",
cls.External: "External server types like npx, uvx",
cls.Local: "Local MCP server (PackageType.MCPServer)",
cls.Hosted: "Tunnel to externally hosted server",
}
return descriptions.get(server_type, "Unknown server type")
72 changes: 57 additions & 15 deletions src/uipath_mcp/_cli/_runtime/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from uipath.tracing import wait_for_tracers

from .._utils._config import McpServer
from ._context import UiPathMcpRuntimeContext
from ._context import UiPathMcpRuntimeContext, UiPathServerType
from ._exception import UiPathMcpRuntimeError
from ._session import SessionServer
from ._stdio_client import stdio_client
Expand All @@ -37,7 +37,9 @@ def __init__(self, context: UiPathMcpRuntimeContext):
super().__init__(context)
self.context: UiPathMcpRuntimeContext = context
self._server: Optional[McpServer] = None
self._runtime_id = self.context.job_id if self.context.job_id else str(uuid.uuid4())
self._runtime_id = (
self.context.job_id if self.context.job_id else str(uuid.uuid4())
)
self._signalr_client: Optional[SignalRClient] = None
self._session_servers: Dict[str, SessionServer] = {}
self._session_output: Optional[str] = None
Expand Down Expand Up @@ -152,9 +154,7 @@ async def cleanup(self) -> None:
try:
await session_server.stop()
except Exception as e:
logger.error(
f"Error cleaning up session server {session_id}: {str(e)}"
)
logger.error(f"Error cleaning up session server {session_id}: {str(e)}")

if self._signalr_client and hasattr(self._signalr_client, "_transport"):
transport = self._signalr_client._transport
Expand Down Expand Up @@ -265,7 +265,7 @@ async def _register(self) -> None:

# Start a temporary stdio client to get tools
# Use a temporary file to capture stderr
with tempfile.TemporaryFile(mode='w+b') as stderr_temp:
with tempfile.TemporaryFile(mode="w+b") as stderr_temp:
async with stdio_client(server_params, errlog=stderr_temp) as (
read,
write,
Expand All @@ -285,7 +285,9 @@ async def _register(self) -> None:
logger.error("Initialization timed out")
# Capture stderr output here, after the timeout
stderr_temp.seek(0)
server_stderr_output = stderr_temp.read().decode('utf-8', errors='replace')
server_stderr_output = stderr_temp.read().decode(
"utf-8", errors="replace"
)
# We'll handle this after exiting the context managers
# We don't continue with registration here - we'll do it after the context managers

Expand Down Expand Up @@ -314,14 +316,13 @@ async def _register(self) -> None:
"Name": self._server.name,
"Slug": self._server.name,
"Version": "1.0.0",
"Type": 1 if self.sandboxed else 3,
"Type": self.server_type.value,
},
"tools": [],
}

for tool in tools_result.tools:
tool_info = {
"Type": 1,
"Name": tool.name,
"ProcessType": "Tool",
"Description": tool.description,
Expand All @@ -347,7 +348,7 @@ async def _register(self) -> None:
async def _on_session_start_error(self, session_id: str) -> None:
"""
Sends a dummy initialization failure message to abort the already connected client.
Sanboxed runtimes are triggered by new client connections.
Sandboxed runtimes are triggered by new client connections.
"""
try:
response = await self._uipath.api_client.request_async(
Expand Down Expand Up @@ -382,6 +383,7 @@ async def _keep_alive(self) -> None:
"""
while not self._cancel_event.is_set():
try:

async def on_keep_alive_response(response: CompletionMessage) -> None:
if response.error:
logger.error(f"Error during keep-alive: {response.error}")
Expand All @@ -391,27 +393,33 @@ async def on_keep_alive_response(response: CompletionMessage) -> None:
# If there are no active sessions and this is a sandbox environment
# We need to cancel the runtime
# eg: when user kills the agent that triggered the runtime, before we subscribe to events
if not session_ids and self.sandboxed and not self._cancel_event.is_set():
logger.error("No active sessions, cancelling sandboxed runtime...")
if (
not session_ids
and self.sandboxed
and not self._cancel_event.is_set()
):
logger.error(
"No active sessions, cancelling sandboxed runtime..."
)
self._cancel_event.set()

await self._signalr_client.send(
method="OnKeepAlive",
arguments=[],
on_invocation=on_keep_alive_response
on_invocation=on_keep_alive_response,
)
except Exception as e:
logger.error(f"Error during keep-alive: {e}")
await asyncio.sleep(60)


async def _on_runtime_abort(self) -> None:
"""
Sends a runtime abort signalr to terminate all connected sessions.
"""
try:
response = await self._uipath.api_client.request_async(
"POST",
f"mcp_/mcp/{self._server.name}/runtime/abort?runtimeId={self._runtime_id}"
f"mcp_/mcp/{self._server.name}/runtime/abort?runtimeId={self._runtime_id}",
)
if response.status_code == 202:
logger.info(
Expand All @@ -435,3 +443,37 @@ def sandboxed(self) -> bool:
bool: True if this is an sandboxed runtime (has a job_id), False otherwise.
"""
return self.context.job_id is not None

@property
def packaged(self) -> bool:
"""
Check if the runtime is packaged (PackageType.MCPServer).

Returns:
bool: True if this is a packaged runtime (has a process), False otherwise.
"""
process_key = self.context.trace_context.process_key

return (
process_key is not None
and process_key != "00000000-0000-0000-0000-000000000000"
)

@property
def server_type(self) -> UiPathServerType:
"""
Determine the correct UiPathServerType for this runtime.

Returns:
UiPathServerType: The appropriate server type enum value based on the runtime configuration.
"""
if self.packaged:
# If it's a packaged runtime (has a process_key), it's a Local server
# Packaged runtimes are also sandboxed
return UiPathServerType.Local
elif self.sandboxed:
# If it's sandboxed but not packaged, it's an External server
return UiPathServerType.External
else:
# If it's neither packaged nor sandboxed, it's a Hosted server
return UiPathServerType.Hosted