Skip to content

Commit 1063981

Browse files
author
undertaker86001
committed
refactor code of comment
1 parent 01a946e commit 1063981

File tree

4 files changed

+337
-17
lines changed

4 files changed

+337
-17
lines changed
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
import asyncio
2+
import logging
3+
import time
4+
from typing import Any, Dict, Optional
5+
6+
from opentelemetry import trace
7+
from opentelemetry.semconv.trace import SpanAttributes
8+
from opentelemetry.trace import SpanKind, Status, StatusCode
9+
from .utils import sanitize_tool_name, is_content_enabled, is_input_capture_enabled
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
# MCP 特定的语义约定 - 根据OpenTelemetry官方规范
15+
class MCPAttributes:
16+
# 核心MCP属性
17+
MCP_METHOD_NAME = "mcp.method.name"
18+
MCP_TOOL_NAME = "mcp.tool.name"
19+
MCP_RESOURCE_URI = "mcp.resource.uri"
20+
MCP_RESOURCE_SIZE = "mcp.resource.size"
21+
22+
# 消息大小相关属性
23+
MCP_MESSAGE_SIZE = "mcp.message.size"
24+
MCP_REQUEST_SIZE = "mcp.request.size"
25+
MCP_RESPONSE_SIZE = "mcp.response.size"
26+
27+
# 协议和连接信息
28+
MCP_PROTOCOL_VERSION = "mcp.protocol.version"
29+
MCP_SERVER_ADDRESS = "server.address"
30+
MCP_SERVER_PORT = "server.port"
31+
32+
# 工具调用相关属性
33+
MCP_TOOL_ARGUMENTS = "mcp.tool.arguments"
34+
MCP_TOOL_RESULT = "mcp.tool.result"
35+
36+
# 错误相关属性
37+
MCP_ERROR_CODE = "mcp.error.code"
38+
MCP_ERROR_MESSAGE = "mcp.error.message"
39+
MCP_ERROR_TYPE = "mcp.error.type"
40+
41+
# 内容相关属性
42+
MCP_CONTENT_COUNT = "mcp.content.count"
43+
MCP_CONTENT_TYPES = "mcp.content.types"
44+
MCP_CONTENTS_COUNT = "mcp.contents.count"
45+
MCP_CONTENTS_TYPES = "mcp.contents.types"
46+
47+
48+
def _calculate_message_size(obj) -> int:
49+
"""计算消息大小(字节)- 暂时简化实现,后续优化"""
50+
try:
51+
if obj is None:
52+
return 0
53+
if isinstance(obj, (str, bytes)):
54+
return len(obj)
55+
# 暂时不记录复杂对象的大小,避免json dumps的hack方式
56+
# 后续可以通过更合适的方式来实现
57+
return 0
58+
except Exception:
59+
return 0
60+
61+
62+
def _extract_tool_arguments(args, kwargs) -> Dict[str, Any]:
63+
"""提取工具调用参数"""
64+
try:
65+
if len(args) > 1:
66+
return {"arguments": args[1]}
67+
elif 'arguments' in kwargs:
68+
return {"arguments": kwargs['arguments']}
69+
return {}
70+
except Exception:
71+
return {}
72+
73+
74+
def _extract_response_details(result) -> Dict[str, Any]:
75+
"""提取响应详情"""
76+
try:
77+
if result is None:
78+
return {"response_type": "null", "response_size": 0}
79+
80+
response_size = _calculate_message_size(result)
81+
response_type = type(result).__name__
82+
83+
details = {
84+
"response_type": response_type,
85+
"response_size": response_size
86+
}
87+
88+
# 如果是工具调用结果,提取更多信息
89+
if hasattr(result, 'content'):
90+
details["content_count"] = len(result.content) if result.content else 0
91+
if result.content:
92+
details["content_types"] = [type(item).__name__ for item in result.content]
93+
94+
return details
95+
except Exception:
96+
return {"response_type": "unknown", "response_size": 0}
97+
98+
99+
def async_mcp_client_initialize(tracer, event_logger, instruments):
100+
"""异步MCP客户端初始化包装器"""
101+
def wrapper(wrapped, instance, args, kwargs):
102+
async def async_wrapper(*a, **kw):
103+
# 在wrapper内部动态检查是否启用内容捕获
104+
capture_content = is_content_enabled()
105+
106+
# 根据OpenTelemetry规范:mcp.client.{method}
107+
with tracer.start_as_current_span(
108+
"mcp.client.initialize",
109+
kind=SpanKind.CLIENT,
110+
) as span:
111+
if span.is_recording():
112+
span.set_attribute(MCPAttributes.MCP_METHOD_NAME, "initialize")
113+
114+
# 记录协议版本信息
115+
if hasattr(instance, '_protocol_version'):
116+
span.set_attribute(MCPAttributes.MCP_PROTOCOL_VERSION, str(instance._protocol_version))
117+
118+
# 记录连接信息
119+
if hasattr(instance, '_server_params'):
120+
server_params = instance._server_params
121+
if hasattr(server_params, 'command'):
122+
span.set_attribute(MCPAttributes.MCP_SERVER_ADDRESS, str(server_params.command))
123+
124+
# 分离instrumentation异常处理和业务逻辑异常处理
125+
try:
126+
start_time = time.time()
127+
result = await wrapped(*a, **kw)
128+
duration = time.time() - start_time
129+
130+
# 记录操作指标
131+
instruments.operation_duration.record(duration, {"mcp.method.name": "initialize"})
132+
instruments.operation_count.add(1, {"mcp.method.name": "initialize", "status": "success"})
133+
134+
if span.is_recording():
135+
span.set_status(Status(StatusCode.OK))
136+
137+
# 记录响应详情
138+
response_details = _extract_response_details(result)
139+
span.set_attribute(MCPAttributes.MCP_RESPONSE_SIZE, response_details["response_size"])
140+
span.set_attribute("mcp.response.type", response_details["response_type"])
141+
142+
# 如果启用内容捕获,记录更多详情
143+
if capture_content and result:
144+
span.set_attribute("mcp.initialize.result", str(result)[:1000])
145+
146+
return result
147+
except Exception as e:
148+
# 记录错误指标
149+
instruments.operation_count.add(1, {"mcp.method.name": "initialize", "status": "error"})
150+
if span.is_recording():
151+
span.set_status(Status(StatusCode.ERROR, str(e)))
152+
span.record_exception(e)
153+
154+
# 记录详细错误信息
155+
span.set_attribute(MCPAttributes.MCP_ERROR_MESSAGE, str(e))
156+
span.set_attribute(MCPAttributes.MCP_ERROR_TYPE, type(e).__name__)
157+
if hasattr(e, 'code'):
158+
span.set_attribute(MCPAttributes.MCP_ERROR_CODE, str(e.code))
159+
160+
raise
161+
return async_wrapper(*args, **kwargs)
162+
return wrapper
163+
164+
165+
def async_mcp_client_list_tools(tracer, event_logger, instruments):
166+
"""异步MCP客户端列出工具包装器"""
167+
def wrapper(wrapped, instance, args, kwargs):
168+
async def async_wrapper(*a, **kw):
169+
# 在wrapper内部动态检查是否启用内容捕获
170+
capture_content = is_content_enabled()
171+
172+
# 根据OpenTelemetry规范:mcp.client.{method}
173+
with tracer.start_as_current_span(
174+
"mcp.client.list_tools",
175+
kind=SpanKind.CLIENT,
176+
) as span:
177+
if span.is_recording():
178+
span.set_attribute(MCPAttributes.MCP_METHOD_NAME, "list_tools")
179+
180+
try:
181+
start_time = time.time()
182+
result = await wrapped(*a, **kw)
183+
duration = time.time() - start_time
184+
185+
# 记录操作指标
186+
instruments.operation_duration.record(duration, {"mcp.method.name": "list_tools"})
187+
instruments.operation_count.add(1, {"mcp.method.name": "list_tools", "status": "success"})
188+
189+
if span.is_recording():
190+
span.set_status(Status(StatusCode.OK))
191+
192+
# 记录响应详情
193+
response_details = _extract_response_details(result)
194+
span.set_attribute(MCPAttributes.MCP_RESPONSE_SIZE, response_details["response_size"])
195+
span.set_attribute("mcp.response.type", response_details["response_type"])
196+
197+
# 记录工具列表信息
198+
if result and hasattr(result, 'tools'):
199+
span.set_attribute("mcp.tools.count", len(result.tools))
200+
if capture_content:
201+
tool_names = [tool.name for tool in result.tools] if result.tools else []
202+
span.set_attribute("mcp.tools.list", str(tool_names)[:500])
203+
204+
return result
205+
except Exception as e:
206+
instruments.operation_count.add(1, {"mcp.method.name": "list_tools", "status": "error"})
207+
if span.is_recording():
208+
span.set_status(Status(StatusCode.ERROR, str(e)))
209+
span.record_exception(e)
210+
span.set_attribute(MCPAttributes.MCP_ERROR_MESSAGE, str(e))
211+
span.set_attribute(MCPAttributes.MCP_ERROR_TYPE, type(e).__name__)
212+
raise
213+
return async_wrapper(*args, **kwargs)
214+
return wrapper
215+
216+
217+
def async_mcp_client_call_tool(tracer, event_logger, instruments):
218+
"""异步MCP客户端调用工具包装器"""
219+
def wrapper(wrapped, instance, args, kwargs):
220+
async def async_wrapper(*a, **kw):
221+
tool_name = args[0] if args else kwargs.get('name', 'unknown')
222+
sanitized_tool_name = sanitize_tool_name(tool_name)
223+
224+
# 在wrapper内部动态检查是否启用内容捕获和输入参数捕获
225+
capture_content = is_content_enabled()
226+
capture_input = is_input_capture_enabled()
227+
228+
# 根据OpenTelemetry规范:mcp.client.{method}
229+
with tracer.start_as_current_span(
230+
"mcp.client.call_tool",
231+
kind=SpanKind.CLIENT,
232+
) as span:
233+
if span.is_recording():
234+
span.set_attribute(MCPAttributes.MCP_METHOD_NAME, "call_tool")
235+
span.set_attribute(MCPAttributes.MCP_TOOL_NAME, tool_name)
236+
237+
# 根据环境变量开关决定是否记录请求参数详情
238+
if capture_input:
239+
tool_args = _extract_tool_arguments(args, kwargs)
240+
if tool_args:
241+
span.set_attribute(MCPAttributes.MCP_TOOL_ARGUMENTS, str(tool_args)[:1000])
242+
span.set_attribute(MCPAttributes.MCP_REQUEST_SIZE, _calculate_message_size(tool_args))
243+
244+
# 如果启用内容捕获,记录工具参数
245+
if capture_content and len(args) > 1:
246+
span.set_attribute("mcp.tool.arguments.detailed", str(args[1])[:1000])
247+
248+
try:
249+
start_time = time.time()
250+
result = await wrapped(*a, **kw)
251+
duration = time.time() - start_time
252+
253+
# 记录操作指标
254+
instruments.operation_duration.record(duration, {
255+
"mcp.method.name": "call_tool",
256+
"mcp.tool.name": tool_name
257+
})
258+
instruments.operation_count.add(1, {
259+
"mcp.method.name": "call_tool",
260+
"mcp.tool.name": tool_name,
261+
"status": "success"
262+
})
263+
264+
if span.is_recording():
265+
span.set_status(Status(StatusCode.OK))
266+
267+
# 记录响应详情
268+
response_details = _extract_response_details(result)
269+
span.set_attribute(MCPAttributes.MCP_RESPONSE_SIZE, response_details["response_size"])
270+
span.set_attribute("mcp.response.type", response_details["response_type"])
271+
272+
if capture_content and result:
273+
span.set_attribute(MCPAttributes.MCP_TOOL_RESULT, str(result)[:1000])
274+
275+
# 记录内容详情
276+
if hasattr(result, 'content') and result.content:
277+
span.set_attribute(MCPAttributes.MCP_CONTENT_COUNT, len(result.content))
278+
content_types = [type(item).__name__ for item in result.content]
279+
span.set_attribute(MCPAttributes.MCP_CONTENT_TYPES, str(content_types)[:500])
280+
281+
return result
282+
except Exception as e:
283+
instruments.operation_count.add(1, {
284+
"mcp.method.name": "call_tool",
285+
"mcp.tool.name": tool_name,
286+
"status": "error"
287+
})
288+
if span.is_recording():
289+
span.set_status(Status(StatusCode.ERROR, str(e)))
290+
span.record_exception(e)
291+
span.set_attribute(MCPAttributes.MCP_ERROR_MESSAGE, str(e))
292+
span.set_attribute(MCPAttributes.MCP_ERROR_TYPE, type(e).__name__)
293+
raise
294+
return async_wrapper(*args, **kwargs)
295+
return wrapper

opentelemetry-instrumentation/src/opentelemetry/instrumentation/mcp/__init__.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,31 +39,32 @@ def _instrument(self, **kwargs):
3939
meter_provider = kwargs.get("meter_provider")
4040
self._meter = get_meter(__name__, "", meter_provider, schema_url=Schemas.V1_28_0.value)
4141
instruments = Instruments(self._meter)
42-
# 包装异步方法
42+
43+
# 包装异步方法,实现动态切换
4344
wrap_function_wrapper(
4445
module="mcp.client.session",
4546
name="ClientSession.initialize",
46-
wrapper=async_mcp_client_initialize(tracer, event_logger, instruments, is_content_enabled()),
47+
wrapper=async_mcp_client_initialize(tracer, event_logger, instruments),
4748
)
4849
wrap_function_wrapper(
4950
module="mcp.client.session",
5051
name="ClientSession.read_resource",
51-
wrapper=async_mcp_client_read_resource(tracer, event_logger, instruments, is_content_enabled()),
52+
wrapper=async_mcp_client_read_resource(tracer, event_logger, instruments),
5253
)
5354
wrap_function_wrapper(
5455
module="mcp.client.session",
5556
name="ClientSession.call_tool",
56-
wrapper=async_mcp_client_call_tool(tracer, event_logger, instruments, is_content_enabled()),
57+
wrapper=async_mcp_client_call_tool(tracer, event_logger, instruments),
5758
)
5859
wrap_function_wrapper(
5960
module="mcp.client.session",
6061
name="ClientSession.list_tools",
61-
wrapper=async_mcp_client_list_tools(tracer, event_logger, instruments, is_content_enabled()),
62+
wrapper=async_mcp_client_list_tools(tracer, event_logger, instruments),
6263
)
6364
wrap_function_wrapper(
6465
module="mcp.client.session",
6566
name="ClientSession.send_ping",
66-
wrapper=async_mcp_client_send_ping(tracer, event_logger, instruments, is_content_enabled()),
67+
wrapper=async_mcp_client_send_ping(tracer, event_logger, instruments),
6768
)
6869

6970
def _uninstrument(self, **kwargs):

0 commit comments

Comments
 (0)