Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/opentelemetry-instrumentation-mcp/.flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[flake8]
exclude =
.git,
__pycache__,
build,
dist,
.tox,
venv,
.venv,
.pytest_cache
max-line-length = 120
1 change: 1 addition & 0 deletions packages/opentelemetry-instrumentation-mcp/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.12.6
33 changes: 33 additions & 0 deletions packages/opentelemetry-instrumentation-mcp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
OpenTelemetry MCP Instrumentation

<a href="https://pypi.org/project/opentelemetry-instrumentation-mcp/">
<img src="https://badge.fury.io/py/opentelemetry-instrumentation-mcp.svg">
</a>

This library allows tracing of agentic workflows implemented with MCP framework [mcp python sdk](https://github.com/modelcontextprotocol/python-sdk).

## Installation

```bash
pip install opentelemetry-instrumentation-mcp
```

## Example usage

```python
from opentelemetry.instrumentation.mcp import McpInstrumentor

McpInstrumentor().instrument()
```

## Privacy

**By default, this instrumentation logs prompts, completions, and embeddings to span attributes**. This gives you a clear visibility into how your LLM application tool usage is working, and can make it easy to debug and evaluate the tool usage.

However, you may want to disable this logging for privacy reasons, as they may contain highly sensitive data from your users. You may also simply want to reduce the size of your traces.

To disable logging, set the `TRACELOOP_TRACE_CONTENT` environment variable to `false`.

```bash
TRACELOOP_TRACE_CONTENT=false
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from opentelemetry.instrumentation.mcp.version import __version__
from opentelemetry.instrumentation.mcp.instrumentation import McpInstrumentor

__all__ = ["McpInstrumentor", "__version__"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Callable, Collection, Tuple, cast
import json
import mcp

from opentelemetry import context, propagate
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.trace import get_tracer
from wrapt import ObjectProxy, register_post_import_hook, wrap_function_wrapper
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.trace.propagation import set_span_in_context
from opentelemetry.semconv_ai import SpanAttributes

from opentelemetry.instrumentation.mcp.version import __version__

_instruments = ("mcp >= 1.6.0",)


class McpInstrumentor(BaseInstrumentor):
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)

register_post_import_hook(
lambda _: wrap_function_wrapper(
"mcp.client.sse", "sse_client", self._transport_wrapper(tracer)
),
"mcp.client.sse",
)
register_post_import_hook(
lambda _: wrap_function_wrapper(
"mcp.server.sse",
"SseServerTransport.connect_sse",
self._transport_wrapper(tracer),
),
"mcp.server.sse",
)
register_post_import_hook(
lambda _: wrap_function_wrapper(
"mcp.client.stdio", "stdio_client", self._transport_wrapper(tracer)
),
"mcp.client.stdio",
)
register_post_import_hook(
lambda _: wrap_function_wrapper(
"mcp.server.stdio", "stdio_server", self._transport_wrapper(tracer)
),
"mcp.server.stdio",
)
register_post_import_hook(
lambda _: wrap_function_wrapper(
"mcp.server.session",
"ServerSession.__init__",
self._base_session_init_wrapper(tracer),
),
"mcp.server.session",
)
wrap_function_wrapper(
"mcp.shared.session",
"BaseSession.send_request",
self.patch_mcp_client(tracer),
)

def _uninstrument(self, **kwargs):
unwrap("mcp.client.stdio", "stdio_client")
unwrap("mcp.server.stdio", "stdio_server")

def _transport_wrapper(self, tracer):
@asynccontextmanager
async def traced_method(
wrapped: Callable[..., Any], instance: Any, args: Any, kwargs: Any
) -> AsyncGenerator[
Tuple["InstrumentedStreamReader", "InstrumentedStreamWriter"], None
]:
async with wrapped(*args, **kwargs) as (read_stream, write_stream):
yield InstrumentedStreamReader(
read_stream, tracer
), InstrumentedStreamWriter(write_stream, tracer)

return traced_method

def _base_session_init_wrapper(self, tracer):
def traced_method(
wrapped: Callable[..., None], instance: Any, args: Any, kwargs: Any
) -> None:
wrapped(*args, **kwargs)
reader = getattr(instance, "_incoming_message_stream_reader", None)
writer = getattr(instance, "_incoming_message_stream_writer", None)
if reader and writer:
setattr(
instance,
"_incoming_message_stream_reader",
ContextAttachingStreamReader(reader, tracer),
)
setattr(
instance,
"_incoming_message_stream_writer",
ContextSavingStreamWriter(writer, tracer),
)

return traced_method

def patch_mcp_client(self, tracer):
def traced_method(wrapped, instance, args, kwargs):
if len(args) < 1:
return
meta = None
method = None
params = None
if hasattr(args[0].root, "method"):
method = args[0].root.method
if hasattr(args[0].root, "params"):
params = args[0].root.params
if params is None:
args[0].root.params = mcp.types.RequestParams()
meta = {}
else:
if hasattr(args[0].root.params, "meta"):
meta = args[0].root.params.meta
if meta is None:
meta = {}

with tracer.start_as_current_span(f"{method}") as span:
span.set_attribute(SpanAttributes.MCP_METHOD_NAME, f"{method}")
span.set_attribute(
SpanAttributes.MCP_REQUEST_ARGUMENT, f"{serialize(args[0])}"
)
ctx = set_span_in_context(span)
TraceContextTextMapPropagator().inject(meta, ctx)
args[0].root.params.meta = meta
try:
result = wrapped(*args, **kwargs)
if result:
span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise

return traced_method


def serialize(request, depth=0, max_depth=4):
"""Serialize input args to MCP server into JSON.
The function accepts input object and converts into JSON
keeping depth in mind to prevent creating large nested JSON"""
if depth > max_depth:
return {}
depth += 1

def is_serializable(request):
try:
json.dumps(request)
return True
except Exception:
return False

if is_serializable(request):
return json.dumps(request)
else:
result = {}
try:
if hasattr(request, "__dict__"):
for attrib in request.__dict__:
if not attrib.startswith("_"):
if type(request.__dict__[attrib]) in [
bool,
str,
int,
float,
type(None),
]:
result[str(attrib)] = request.__dict__[attrib]
else:
result[str(attrib)] = serialize(
request.__dict__[attrib], depth
)
except Exception:
pass
return json.dumps(result)


class InstrumentedStreamReader(ObjectProxy): # type: ignore
# ObjectProxy missing context manager - https://github.com/GrahamDumpleton/wrapt/issues/73
def __init__(self, wrapped, tracer):
super().__init__(wrapped)
self._tracer = tracer

async def __aenter__(self) -> Any:
return await self.__wrapped__.__aenter__()

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)

async def __aiter__(self) -> AsyncGenerator[Any, None]:
from mcp.types import JSONRPCMessage, JSONRPCRequest

async for item in self.__wrapped__:
request = cast(JSONRPCMessage, item).root
if not isinstance(request, JSONRPCRequest):
yield item
continue

if request.params:
meta = request.params.get("_meta")
if meta:
ctx = propagate.extract(meta)
restore = context.attach(ctx)
try:
yield item
continue
finally:
context.detach(restore)
yield item


class InstrumentedStreamWriter(ObjectProxy): # type: ignore
# ObjectProxy missing context manager - https://github.com/GrahamDumpleton/wrapt/issues/73
def __init__(self, wrapped, tracer):
super().__init__(wrapped)
self._tracer = tracer

async def __aenter__(self) -> Any:
return await self.__wrapped__.__aenter__()

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)

async def send(self, item: Any) -> Any:
from mcp.types import JSONRPCMessage, JSONRPCRequest

request = cast(JSONRPCMessage, item).root

with self._tracer.start_as_current_span("ResponseStreamWriter") as span:
if hasattr(request, "result"):
span.set_attribute(
SpanAttributes.MCP_RESPONSE_VALUE, f"{serialize(request.result)}"
)
if hasattr(request, "id"):
span.set_attribute(SpanAttributes.MCP_REQUEST_ID, f"{request.id}")

if not isinstance(request, JSONRPCRequest):
return await self.__wrapped__.send(item)
meta = None
if not request.params:
request.params = {}
meta = request.params.setdefault("_meta", {})

propagate.get_global_textmap().inject(meta)
return await self.__wrapped__.send(item)


@dataclass(slots=True, frozen=True)
class ItemWithContext:
item: Any
ctx: context.Context


class ContextSavingStreamWriter(ObjectProxy): # type: ignore
# ObjectProxy missing context manager - https://github.com/GrahamDumpleton/wrapt/issues/73
def __init__(self, wrapped, tracer):
super().__init__(wrapped)
self._tracer = tracer

async def __aenter__(self) -> Any:
return await self.__wrapped__.__aenter__()

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)

async def send(self, item: Any) -> Any:
with self._tracer.start_as_current_span("RequestStreamWriter") as span:
if hasattr(item, "request_id"):
span.set_attribute(SpanAttributes.MCP_REQUEST_ID, f"{item.request_id}")
if hasattr(item, "request"):
if hasattr(item.request, "root"):
if hasattr(item.request.root, "method"):
span.set_attribute(
SpanAttributes.MCP_METHOD_NAME,
f"{item.request.root.method}",
)
if hasattr(item.request.root, "params"):
span.set_attribute(
SpanAttributes.MCP_REQUEST_ARGUMENT,
f"{serialize(item.request.root.params)}",
)
ctx = context.get_current()
return await self.__wrapped__.send(ItemWithContext(item, ctx))


class ContextAttachingStreamReader(ObjectProxy): # type: ignore
# ObjectProxy missing context manager - https://github.com/GrahamDumpleton/wrapt/issues/73
def __init__(self, wrapped, tracer):
super().__init__(wrapped)
self._tracer = tracer

async def __aenter__(self) -> Any:
return await self.__wrapped__.__aenter__()

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)

async def __aiter__(self) -> AsyncGenerator[Any, None]:
async for item in self.__wrapped__:
item_with_context = cast(ItemWithContext, item)
restore = context.attach(item_with_context.ctx)
try:
yield item_with_context.item
finally:
context.detach(restore)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.39.1"
Loading