66from mcp .client .stdio import stdio_client
77from opentelemetry import trace
88from uipath import UiPath
9- from uipath .tracing import traced , wait_for_tracers
9+ from uipath .tracing import wait_for_tracers
1010
1111from .._utils ._config import McpServer
1212from ._logger import LoggerAdapter
13+ from ._tracer import McpTracer
1314
1415logger = logging .getLogger (__name__ )
1516tracer = trace .get_tracer (__name__ )
@@ -28,6 +29,7 @@ def __init__(self, server_config: McpServer, session_id: str):
2829 self .context_task = None
2930 self ._message_queue = asyncio .Queue ()
3031 self ._uipath = UiPath ()
32+ self ._mcp_tracer = McpTracer (tracer , logger )
3133
3234 async def start (self ) -> None :
3335 """Start the server process in a separate task."""
@@ -59,39 +61,33 @@ async def _run_server(self, server_params: StdioServerParameters) -> None:
5961 """Run the server in proper context managers."""
6062 logger .info (f"Starting server process for session { self .session_id } " )
6163 try :
62- stderr_adapter = LoggerAdapter (logger )
63- with tracer .start_as_current_span (self .server_config .name ) as root_span :
64- root_span .set_attribute ("session_id" , self .session_id )
65- root_span .set_attribute (
66- "server_params" , server_params .model_dump_json ()
67- )
68- async with stdio_client (server_params , errlog = stderr_adapter ) as (
69- read ,
70- write ,
71- ):
72- self .read_stream , self .write_stream = read , write
73- logger .info (f"Session { self .session_id } - stdio client started" )
64+ stderr_null = LoggerAdapter (logger )
7465
75- logger .info (f"Session { self .session_id } - MCP session initialized" )
66+ async with stdio_client (server_params , errlog = stderr_null ) as (
67+ read ,
68+ write ,
69+ ):
70+ self .read_stream , self .write_stream = read , write
71+ logger .info (f"Session { self .session_id } - stdio client started" )
7672
77- # Start the message consumer task
78- consumer_task = asyncio .create_task (self ._consume_messages ())
73+ # Start the message consumer task
74+ consumer_task = asyncio .create_task (self ._consume_messages ())
7975
80- # Process incoming messages from the server
76+ # Process incoming messages from the server
77+ try :
78+ while True :
79+ print ("Waiting for messages..." )
80+ message = await self .read_stream .receive ()
81+ json_str = message .model_dump_json ()
82+ print (f"Received message from local server: { json_str } " )
83+ await self .send_outgoing_message (message )
84+ finally :
85+ # Cancel the consumer when we exit the loop
86+ consumer_task .cancel ()
8187 try :
82- while True :
83- print ("Waiting for messages..." )
84- message = await self .read_stream .receive ()
85- json_str = message .model_dump_json ()
86- print (f"Received message from local server: { json_str } " )
87- await self .send_outgoing_message (message )
88- finally :
89- # Cancel the consumer when we exit the loop
90- consumer_task .cancel ()
91- try :
92- await consumer_task
93- except asyncio .CancelledError :
94- pass
88+ await consumer_task
89+ except asyncio .CancelledError :
90+ pass
9591
9692 except Exception as e :
9793 logger .error (
@@ -163,7 +159,6 @@ async def send_message(self, message: types.JSONRPCMessage) -> None:
163159 await self ._message_queue .put (message )
164160 logger .debug (f"Session { self .session_id } - message queued for processing" )
165161
166- @traced ()
167162 async def get_incoming_messages (self ) -> None :
168163 """Get new messages from UiPath MCP Server."""
169164 response = self ._uipath .api_client .request (
@@ -175,23 +170,37 @@ async def get_incoming_messages(self) -> None:
175170 for message in messages :
176171 logger .info (f"Incoming message from UiPath MCP Server: { message } " )
177172 json_message = types .JSONRPCMessage .model_validate (message )
178- logger .info (f"Forwarding message to local MCP Server: { message } " )
179- await self .send_message (json_message )
173+ with self ._mcp_tracer .create_span_for_message (
174+ json_message ,
175+ session_id = self .session_id ,
176+ server_name = self .server_config .name
177+ ) as _ :
178+ logger .info (f"Forwarding message to local MCP Server: { message } " )
179+ await self .send_message (json_message )
180180
181- @traced ()
182181 async def send_outgoing_message (self , message : types .JSONRPCMessage ) -> None :
183182 """Send new message to UiPath MCP Server."""
184- response = self ._uipath .api_client .request (
185- "POST" ,
186- f"mcp_/mcp/{ self .server_config .name } /out/message?sessionId={ self .session_id } " ,
187- json = message .model_dump (),
188- )
189- if response .status_code == 202 :
190- logger .info (f"Outgoing message sent to UiPath MCP Server: { message } " )
191- else :
192- logger .error (
193- f"Failed to send outgoing message to UiPath MCP Server: { response .status_code } { response .text } "
194- )
183+ with self ._mcp_tracer .create_span_for_message (
184+ message ,
185+ session_id = self .session_id ,
186+ server_name = self .server_config .name
187+ ) as span :
188+ try :
189+ response = self ._uipath .api_client .request (
190+ "POST" ,
191+ f"mcp_/mcp/{ self .server_config .name } /out/message?sessionId={ self .session_id } " ,
192+ json = message .model_dump (),
193+ )
194+
195+ span .set_attribute ("http.status_code" , response .status_code )
196+
197+ if response .status_code == 202 :
198+ logger .info (f"Outgoing message sent to UiPath MCP Server: { message } " )
199+ else :
200+ self ._mcp_tracer .record_http_error (span , response .status_code , response .text )
201+ except Exception as e :
202+ self ._mcp_tracer .record_exception (span , e )
203+ raise
195204
196205 async def cleanup (self ) -> None :
197206 """Clean up resources and stop the server."""
0 commit comments