Skip to content

Commit 409f0a5

Browse files
authored
feat(instrumentation): Adding MCP opentelemetry-instrumentation into traceloop (#2829)
1 parent 57053f1 commit 409f0a5

File tree

15 files changed

+1987
-28
lines changed

15 files changed

+1987
-28
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[flake8]
2+
exclude =
3+
.git,
4+
__pycache__,
5+
build,
6+
dist,
7+
.tox,
8+
venv,
9+
.venv,
10+
.pytest_cache
11+
max-line-length = 120
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.12.6
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
OpenTelemetry MCP Instrumentation
2+
3+
<a href="https://pypi.org/project/opentelemetry-instrumentation-mcp/">
4+
<img src="https://badge.fury.io/py/opentelemetry-instrumentation-mcp.svg">
5+
</a>
6+
7+
This library allows tracing of agentic workflows implemented with MCP framework [mcp python sdk](https://github.com/modelcontextprotocol/python-sdk).
8+
9+
## Installation
10+
11+
```bash
12+
pip install opentelemetry-instrumentation-mcp
13+
```
14+
15+
## Example usage
16+
17+
```python
18+
from opentelemetry.instrumentation.mcp import McpInstrumentor
19+
20+
McpInstrumentor().instrument()
21+
```
22+
23+
## Privacy
24+
25+
**By default, this instrumentation logs prompts, completions, and embeddings to span attributes**. This gives you a clear visibility into how your LLM application tool usage is working, and can make it easy to debug and evaluate the tool usage.
26+
27+
However, you may want to disable this logging for privacy reasons, as they may contain highly sensitive data from your users. You may also simply want to reduce the size of your traces.
28+
29+
To disable logging, set the `TRACELOOP_TRACE_CONTENT` environment variable to `false`.
30+
31+
```bash
32+
TRACELOOP_TRACE_CONTENT=false
33+
```
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from opentelemetry.instrumentation.mcp.version import __version__
2+
from opentelemetry.instrumentation.mcp.instrumentation import McpInstrumentor
3+
4+
__all__ = ["McpInstrumentor", "__version__"]
Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
from contextlib import asynccontextmanager
2+
from dataclasses import dataclass
3+
from typing import Any, AsyncGenerator, Callable, Collection, Tuple, cast
4+
import json
5+
import mcp
6+
7+
from opentelemetry import context, propagate
8+
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
9+
from opentelemetry.instrumentation.utils import unwrap
10+
from opentelemetry.trace import get_tracer
11+
from wrapt import ObjectProxy, register_post_import_hook, wrap_function_wrapper
12+
from opentelemetry.trace.status import Status, StatusCode
13+
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
14+
from opentelemetry.trace.propagation import set_span_in_context
15+
from opentelemetry.semconv_ai import SpanAttributes
16+
17+
from opentelemetry.instrumentation.mcp.version import __version__
18+
19+
_instruments = ("mcp >= 1.6.0",)
20+
21+
22+
class McpInstrumentor(BaseInstrumentor):
23+
def instrumentation_dependencies(self) -> Collection[str]:
24+
return _instruments
25+
26+
def _instrument(self, **kwargs):
27+
tracer_provider = kwargs.get("tracer_provider")
28+
tracer = get_tracer(__name__, __version__, tracer_provider)
29+
30+
register_post_import_hook(
31+
lambda _: wrap_function_wrapper(
32+
"mcp.client.sse", "sse_client", self._transport_wrapper(tracer)
33+
),
34+
"mcp.client.sse",
35+
)
36+
register_post_import_hook(
37+
lambda _: wrap_function_wrapper(
38+
"mcp.server.sse",
39+
"SseServerTransport.connect_sse",
40+
self._transport_wrapper(tracer),
41+
),
42+
"mcp.server.sse",
43+
)
44+
register_post_import_hook(
45+
lambda _: wrap_function_wrapper(
46+
"mcp.client.stdio", "stdio_client", self._transport_wrapper(tracer)
47+
),
48+
"mcp.client.stdio",
49+
)
50+
register_post_import_hook(
51+
lambda _: wrap_function_wrapper(
52+
"mcp.server.stdio", "stdio_server", self._transport_wrapper(tracer)
53+
),
54+
"mcp.server.stdio",
55+
)
56+
register_post_import_hook(
57+
lambda _: wrap_function_wrapper(
58+
"mcp.server.session",
59+
"ServerSession.__init__",
60+
self._base_session_init_wrapper(tracer),
61+
),
62+
"mcp.server.session",
63+
)
64+
wrap_function_wrapper(
65+
"mcp.shared.session",
66+
"BaseSession.send_request",
67+
self.patch_mcp_client(tracer),
68+
)
69+
70+
def _uninstrument(self, **kwargs):
71+
unwrap("mcp.client.stdio", "stdio_client")
72+
unwrap("mcp.server.stdio", "stdio_server")
73+
74+
def _transport_wrapper(self, tracer):
75+
@asynccontextmanager
76+
async def traced_method(
77+
wrapped: Callable[..., Any], instance: Any, args: Any, kwargs: Any
78+
) -> AsyncGenerator[
79+
Tuple["InstrumentedStreamReader", "InstrumentedStreamWriter"], None
80+
]:
81+
async with wrapped(*args, **kwargs) as (read_stream, write_stream):
82+
yield InstrumentedStreamReader(
83+
read_stream, tracer
84+
), InstrumentedStreamWriter(write_stream, tracer)
85+
86+
return traced_method
87+
88+
def _base_session_init_wrapper(self, tracer):
89+
def traced_method(
90+
wrapped: Callable[..., None], instance: Any, args: Any, kwargs: Any
91+
) -> None:
92+
wrapped(*args, **kwargs)
93+
reader = getattr(instance, "_incoming_message_stream_reader", None)
94+
writer = getattr(instance, "_incoming_message_stream_writer", None)
95+
if reader and writer:
96+
setattr(
97+
instance,
98+
"_incoming_message_stream_reader",
99+
ContextAttachingStreamReader(reader, tracer),
100+
)
101+
setattr(
102+
instance,
103+
"_incoming_message_stream_writer",
104+
ContextSavingStreamWriter(writer, tracer),
105+
)
106+
107+
return traced_method
108+
109+
def patch_mcp_client(self, tracer):
110+
def traced_method(wrapped, instance, args, kwargs):
111+
if len(args) < 1:
112+
return
113+
meta = None
114+
method = None
115+
params = None
116+
if hasattr(args[0].root, "method"):
117+
method = args[0].root.method
118+
if hasattr(args[0].root, "params"):
119+
params = args[0].root.params
120+
if params is None:
121+
args[0].root.params = mcp.types.RequestParams()
122+
meta = {}
123+
else:
124+
if hasattr(args[0].root.params, "meta"):
125+
meta = args[0].root.params.meta
126+
if meta is None:
127+
meta = {}
128+
129+
with tracer.start_as_current_span(f"{method}") as span:
130+
span.set_attribute(SpanAttributes.MCP_METHOD_NAME, f"{method}")
131+
span.set_attribute(
132+
SpanAttributes.MCP_REQUEST_ARGUMENT, f"{serialize(args[0])}"
133+
)
134+
ctx = set_span_in_context(span)
135+
TraceContextTextMapPropagator().inject(meta, ctx)
136+
args[0].root.params.meta = meta
137+
try:
138+
result = wrapped(*args, **kwargs)
139+
if result:
140+
span.set_status(Status(StatusCode.OK))
141+
return result
142+
except Exception as e:
143+
span.record_exception(e)
144+
span.set_status(Status(StatusCode.ERROR, str(e)))
145+
raise
146+
147+
return traced_method
148+
149+
150+
def serialize(request, depth=0, max_depth=4):
151+
"""Serialize input args to MCP server into JSON.
152+
The function accepts input object and converts into JSON
153+
keeping depth in mind to prevent creating large nested JSON"""
154+
if depth > max_depth:
155+
return {}
156+
depth += 1
157+
158+
def is_serializable(request):
159+
try:
160+
json.dumps(request)
161+
return True
162+
except Exception:
163+
return False
164+
165+
if is_serializable(request):
166+
return json.dumps(request)
167+
else:
168+
result = {}
169+
try:
170+
if hasattr(request, "__dict__"):
171+
for attrib in request.__dict__:
172+
if not attrib.startswith("_"):
173+
if type(request.__dict__[attrib]) in [
174+
bool,
175+
str,
176+
int,
177+
float,
178+
type(None),
179+
]:
180+
result[str(attrib)] = request.__dict__[attrib]
181+
else:
182+
result[str(attrib)] = serialize(
183+
request.__dict__[attrib], depth
184+
)
185+
except Exception:
186+
pass
187+
return json.dumps(result)
188+
189+
190+
class InstrumentedStreamReader(ObjectProxy): # type: ignore
191+
# ObjectProxy missing context manager - https://github.com/GrahamDumpleton/wrapt/issues/73
192+
def __init__(self, wrapped, tracer):
193+
super().__init__(wrapped)
194+
self._tracer = tracer
195+
196+
async def __aenter__(self) -> Any:
197+
return await self.__wrapped__.__aenter__()
198+
199+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
200+
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)
201+
202+
async def __aiter__(self) -> AsyncGenerator[Any, None]:
203+
from mcp.types import JSONRPCMessage, JSONRPCRequest
204+
205+
async for item in self.__wrapped__:
206+
request = cast(JSONRPCMessage, item).root
207+
if not isinstance(request, JSONRPCRequest):
208+
yield item
209+
continue
210+
211+
if request.params:
212+
meta = request.params.get("_meta")
213+
if meta:
214+
ctx = propagate.extract(meta)
215+
restore = context.attach(ctx)
216+
try:
217+
yield item
218+
continue
219+
finally:
220+
context.detach(restore)
221+
yield item
222+
223+
224+
class InstrumentedStreamWriter(ObjectProxy): # type: ignore
225+
# ObjectProxy missing context manager - https://github.com/GrahamDumpleton/wrapt/issues/73
226+
def __init__(self, wrapped, tracer):
227+
super().__init__(wrapped)
228+
self._tracer = tracer
229+
230+
async def __aenter__(self) -> Any:
231+
return await self.__wrapped__.__aenter__()
232+
233+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
234+
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)
235+
236+
async def send(self, item: Any) -> Any:
237+
from mcp.types import JSONRPCMessage, JSONRPCRequest
238+
239+
request = cast(JSONRPCMessage, item).root
240+
241+
with self._tracer.start_as_current_span("ResponseStreamWriter") as span:
242+
if hasattr(request, "result"):
243+
span.set_attribute(
244+
SpanAttributes.MCP_RESPONSE_VALUE, f"{serialize(request.result)}"
245+
)
246+
if hasattr(request, "id"):
247+
span.set_attribute(SpanAttributes.MCP_REQUEST_ID, f"{request.id}")
248+
249+
if not isinstance(request, JSONRPCRequest):
250+
return await self.__wrapped__.send(item)
251+
meta = None
252+
if not request.params:
253+
request.params = {}
254+
meta = request.params.setdefault("_meta", {})
255+
256+
propagate.get_global_textmap().inject(meta)
257+
return await self.__wrapped__.send(item)
258+
259+
260+
@dataclass(slots=True, frozen=True)
261+
class ItemWithContext:
262+
item: Any
263+
ctx: context.Context
264+
265+
266+
class ContextSavingStreamWriter(ObjectProxy): # type: ignore
267+
# ObjectProxy missing context manager - https://github.com/GrahamDumpleton/wrapt/issues/73
268+
def __init__(self, wrapped, tracer):
269+
super().__init__(wrapped)
270+
self._tracer = tracer
271+
272+
async def __aenter__(self) -> Any:
273+
return await self.__wrapped__.__aenter__()
274+
275+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
276+
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)
277+
278+
async def send(self, item: Any) -> Any:
279+
with self._tracer.start_as_current_span("RequestStreamWriter") as span:
280+
if hasattr(item, "request_id"):
281+
span.set_attribute(SpanAttributes.MCP_REQUEST_ID, f"{item.request_id}")
282+
if hasattr(item, "request"):
283+
if hasattr(item.request, "root"):
284+
if hasattr(item.request.root, "method"):
285+
span.set_attribute(
286+
SpanAttributes.MCP_METHOD_NAME,
287+
f"{item.request.root.method}",
288+
)
289+
if hasattr(item.request.root, "params"):
290+
span.set_attribute(
291+
SpanAttributes.MCP_REQUEST_ARGUMENT,
292+
f"{serialize(item.request.root.params)}",
293+
)
294+
ctx = context.get_current()
295+
return await self.__wrapped__.send(ItemWithContext(item, ctx))
296+
297+
298+
class ContextAttachingStreamReader(ObjectProxy): # type: ignore
299+
# ObjectProxy missing context manager - https://github.com/GrahamDumpleton/wrapt/issues/73
300+
def __init__(self, wrapped, tracer):
301+
super().__init__(wrapped)
302+
self._tracer = tracer
303+
304+
async def __aenter__(self) -> Any:
305+
return await self.__wrapped__.__aenter__()
306+
307+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
308+
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)
309+
310+
async def __aiter__(self) -> AsyncGenerator[Any, None]:
311+
async for item in self.__wrapped__:
312+
item_with_context = cast(ItemWithContext, item)
313+
restore = context.attach(item_with_context.ctx)
314+
try:
315+
yield item_with_context.item
316+
finally:
317+
context.detach(restore)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__version__ = "0.39.1"

0 commit comments

Comments
 (0)