-
Notifications
You must be signed in to change notification settings - Fork 25
[feat] Contribute mcp Instrumentor #435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 22 commits
d8a0ee3
19b25ce
b2bad7c
49649e7
442a45a
10d91ac
0e6e339
6a5f9d6
db1af61
a6ba4c4
4ea1757
3f576c2
f54609e
f8e7172
1a9ce26
fb9d1c7
7b9a8f4
de0ec1d
43526f6
bd28ccc
70b0da3
5f4f773
0af52d9
0d10a82
775f0b1
141f93c
176fd90
0ea9695
e6c479e
1f4f476
8621644
c78df18
5f70570
064c71c
c97c8df
dc2c730
76c2405
323b87a
3902c60
51aaba9
9c0e57a
dc91c62
d7666ca
aab4e7d
5c1aeec
de1020e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
# MCP Instrumentor | ||
|
||
OpenTelemetry instrumentation for Model Context Protocol (MCP). | ||
|
||
## Installation | ||
|
||
Included in AWS OpenTelemetry Distro: | ||
|
||
```bash | ||
pip install aws-opentelemetry-distro | ||
``` | ||
|
||
## Usage | ||
|
||
Automatically enabled with: | ||
|
||
```bash | ||
opentelemetry-instrument python your_mcp_app.py | ||
``` | ||
|
||
## Configuration | ||
|
||
- `MCP_INSTRUMENTATION_SERVER_NAME`: Override default server name (default: "mcp server") | ||
|
||
## Spans Created | ||
|
||
- **Client**: | ||
- Initialize: `mcp.initialize` | ||
- List Tools: `mcp.list_tools` | ||
- Call Tool: `mcp.call_tool.{tool_name}` | ||
- **Server**: `tools/initialize`, `tools/list`, `tools/{tool_name}` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,208 @@ | ||||||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||||||
# SPDX-License-Identifier: Apache-2.0 | ||||||
from typing import Any, Callable, Collection, Dict, Tuple | ||||||
|
||||||
from mcp import ClientRequest | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you lazy load this import like you did with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed! Thanks! |
||||||
from wrapt import register_post_import_hook, wrap_function_wrapper | ||||||
|
||||||
from opentelemetry import trace | ||||||
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor | ||||||
from opentelemetry.instrumentation.utils import unwrap | ||||||
from .semconv import MCPAttributes, MCPSpanNames, MCPOperations, MCPTraceContext, MCPEnvironmentVariables | ||||||
_instruments = ("mcp >= 1.6.0",) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
class MCPInstrumentor(BaseInstrumentor): | ||||||
""" | ||||||
An instrumenter for MCP. | ||||||
""" | ||||||
|
||||||
def __init__(self): | ||||||
super().__init__() | ||||||
self.tracer = None | ||||||
|
||||||
@staticmethod | ||||||
def instrumentation_dependencies() -> Collection[str]: | ||||||
return _instruments | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! |
||||||
|
||||||
def _instrument(self, **kwargs: Any) -> None: | ||||||
tracer_provider = kwargs.get("tracer_provider") | ||||||
if tracer_provider: | ||||||
self.tracer = tracer_provider.get_tracer("instrumentation.mcp") | ||||||
else: | ||||||
self.tracer = trace.get_tracer("instrumentation.mcp") | ||||||
register_post_import_hook( | ||||||
lambda _: wrap_function_wrapper( | ||||||
"mcp.shared.session", | ||||||
"BaseSession.send_request", | ||||||
self._wrap_send_request, | ||||||
), | ||||||
"mcp.shared.session", | ||||||
) | ||||||
register_post_import_hook( | ||||||
lambda _: wrap_function_wrapper( | ||||||
"mcp.server.lowlevel.server", | ||||||
"Server._handle_request", | ||||||
self._wrap_handle_request, | ||||||
), | ||||||
"mcp.server.lowlevel.server", | ||||||
) | ||||||
|
||||||
@staticmethod | ||||||
def _uninstrument(**kwargs: Any) -> None: | ||||||
unwrap("mcp.shared.session", "BaseSession.send_request") | ||||||
unwrap("mcp.server.lowlevel.server", "Server._handle_request") | ||||||
|
||||||
# Send Request Wrapper | ||||||
def _wrap_send_request( | ||||||
self, wrapped: Callable, instance: Any, args: Tuple[Any, ...], kwargs: Dict[str, Any] | ||||||
) -> Callable: | ||||||
""" | ||||||
Changes made: | ||||||
The wrapper intercepts the request before sending, injects distributed tracing context into the | ||||||
request's params._meta field and creates OpenTelemetry spans. The wrapper does not change anything | ||||||
else from the original function's behavior because it reconstructs the request object with the same | ||||||
type and calling the original function with identical parameters. | ||||||
""" | ||||||
|
||||||
async def async_wrapper(): | ||||||
with self.tracer.start_as_current_span(MCPSpanNames.CLIENT_SEND_REQUEST, kind=trace.SpanKind.CLIENT) as span: | ||||||
span_ctx = span.get_span_context() | ||||||
request = args[0] if len(args) > 0 else kwargs.get("request") | ||||||
if request: | ||||||
req_root = request.root if hasattr(request, "root") else request | ||||||
|
||||||
self._generate_mcp_attributes(span, req_root, is_client=True) | ||||||
request_data = request.model_dump(by_alias=True, mode="json", exclude_none=True) | ||||||
self._inject_trace_context(request_data, span_ctx) | ||||||
# Reconstruct request object with injected trace context | ||||||
modified_request = type(request).model_validate(request_data) | ||||||
if len(args) > 0: | ||||||
new_args = (modified_request,) + args[1:] | ||||||
result = await wrapped(*new_args, **kwargs) | ||||||
else: | ||||||
kwargs["request"] = modified_request | ||||||
result = await wrapped(*args, **kwargs) | ||||||
else: | ||||||
result = await wrapped(*args, **kwargs) | ||||||
return result | ||||||
|
||||||
return async_wrapper() | ||||||
|
||||||
# Handle Request Wrapper | ||||||
async def _wrap_handle_request( | ||||||
self, wrapped: Callable, instance: Any, args: Tuple[Any, ...], kwargs: Dict[str, Any] | ||||||
) -> Any: | ||||||
""" | ||||||
Changes made: | ||||||
This wrapper intercepts requests before processing, extracts distributed tracing context from | ||||||
the request's params._meta field, and creates server-side OpenTelemetry spans linked to the client spans. | ||||||
The wrapper also does not change the original function's behavior by calling it with identical parameters | ||||||
ensuring no breaking changes to the MCP server functionality. | ||||||
|
||||||
request (args[1]) is typically an instance of CallToolRequest or ListToolsRequest | ||||||
and should have the structure: | ||||||
request.params.meta.traceparent -> "00-<trace_id>-<span_id>-01" | ||||||
""" | ||||||
req = args[1] if len(args) > 1 else None | ||||||
traceparent = None | ||||||
|
||||||
if req and hasattr(req, "params") and req.params and hasattr(req.params, "meta") and req.params.meta: | ||||||
traceparent = getattr(req.params.meta, MCPTraceContext.TRACEPARENT_HEADER, None) | ||||||
span_context = self._extract_span_context_from_traceparent(traceparent) if traceparent else None | ||||||
if span_context: | ||||||
span_name = self._get_mcp_operation(req) | ||||||
with self.tracer.start_as_current_span( | ||||||
span_name, | ||||||
kind=trace.SpanKind.SERVER, | ||||||
context=trace.set_span_in_context(trace.NonRecordingSpan(span_context)), | ||||||
) as span: | ||||||
self._generate_mcp_attributes(span, req, False) | ||||||
result = await wrapped(*args, **kwargs) | ||||||
return result | ||||||
else: | ||||||
return await wrapped(*args, **kwargs) | ||||||
|
||||||
def _generate_mcp_attributes(self, span: trace.Span, request: ClientRequest, is_client: bool) -> None: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can also be a static method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This calls other static methods (add_client_attributes and add_server_attributes), so I think keeping _generate_mcp_attributes as an instance method is cleaner. Otherwise, we’d have to call them like MCPInstrumentor.add_client_attributes, which feels less natural in this context. Let me know if I'm wrong. Thank you! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed! Thanks! |
||||||
import mcp.types as types # pylint: disable=import-outside-toplevel,consider-using-from-import | ||||||
|
||||||
operation = MCPOperations.UNKNOWN_OPERATION | ||||||
|
||||||
if isinstance(request, types.ListToolsRequest): | ||||||
operation = MCPOperations.LIST_TOOL | ||||||
span.set_attribute(MCPAttributes.MCP_LIST_TOOLS, True) | ||||||
if is_client: | ||||||
span.update_name(MCPSpanNames.CLIENT_LIST_TOOLS) | ||||||
elif isinstance(request, types.CallToolRequest): | ||||||
operation = request.params.name | ||||||
span.set_attribute(MCPAttributes.MCP_CALL_TOOL, True) | ||||||
if is_client: | ||||||
span.update_name(MCPSpanNames.client_call_tool(request.params.name)) | ||||||
elif isinstance(request, types.InitializeRequest): | ||||||
operation = MCPOperations.INITIALIZE | ||||||
span.set_attribute(MCPAttributes.MCP_INITIALIZE, True) | ||||||
if is_client: | ||||||
span.update_name(MCPSpanNames.CLIENT_INITIALIZE) | ||||||
|
||||||
if is_client: | ||||||
self._add_client_attributes(span, operation, request) | ||||||
else: | ||||||
self._add_server_attributes(span, operation, request) | ||||||
|
||||||
@staticmethod | ||||||
def _inject_trace_context(request_data: Dict[str, Any], span_ctx) -> None: | ||||||
if "params" not in request_data: | ||||||
request_data["params"] = {} | ||||||
if "_meta" not in request_data["params"]: | ||||||
request_data["params"]["_meta"] = {} | ||||||
trace_id_hex = f"{span_ctx.trace_id:032x}" | ||||||
span_id_hex = f"{span_ctx.span_id:016x}" | ||||||
trace_flags = MCPTraceContext.TRACE_FLAGS_SAMPLED | ||||||
traceparent = f"{MCPTraceContext.TRACEPARENT_VERSION}-{trace_id_hex}-{span_id_hex}-{trace_flags}" | ||||||
request_data["params"]["_meta"][MCPTraceContext.TRACEPARENT_HEADER] = traceparent | ||||||
|
||||||
@staticmethod | ||||||
def _extract_span_context_from_traceparent(traceparent: str): | ||||||
parts = traceparent.split("-") | ||||||
if len(parts) == 4: | ||||||
try: | ||||||
trace_id = int(parts[1], 16) | ||||||
span_id = int(parts[2], 16) | ||||||
return trace.SpanContext( | ||||||
trace_id=trace_id, | ||||||
span_id=span_id, | ||||||
is_remote=True, | ||||||
trace_flags=trace.TraceFlags(trace.TraceFlags.SAMPLED), | ||||||
trace_state=trace.TraceState(), | ||||||
) | ||||||
except ValueError: | ||||||
return None | ||||||
return None | ||||||
|
||||||
@staticmethod | ||||||
def _get_mcp_operation(req: ClientRequest) -> str: | ||||||
import mcp.types as types # pylint: disable=import-outside-toplevel,consider-using-from-import | ||||||
|
||||||
span_name = "unknown" | ||||||
|
||||||
if isinstance(req, types.ListToolsRequest): | ||||||
span_name = MCPSpanNames.TOOLS_LIST | ||||||
elif isinstance(req, types.CallToolRequest): | ||||||
span_name = MCPSpanNames.tools_call(req.params.name) | ||||||
elif isinstance(req, types.InitializeRequest): | ||||||
span_name = MCPSpanNames.TOOLS_INITIALIZE | ||||||
return span_name | ||||||
|
||||||
@staticmethod | ||||||
def _add_client_attributes(span: trace.Span, operation: str, request: ClientRequest) -> None: | ||||||
import os # pylint: disable=import-outside-toplevel | ||||||
|
||||||
service_name = os.environ.get(MCPEnvironmentVariables.SERVER_NAME, "mcp server") | ||||||
span.set_attribute(MCPAttributes.AWS_REMOTE_SERVICE, service_name) | ||||||
span.set_attribute(MCPAttributes.AWS_REMOTE_OPERATION, operation) | ||||||
if hasattr(request, "params") and request.params and hasattr(request.params, "name"): | ||||||
span.set_attribute(MCPAttributes.MCP_TOOL_NAME, request.params.name) | ||||||
|
||||||
@staticmethod | ||||||
def _add_server_attributes(span: trace.Span, operation: str, request: ClientRequest) -> None: | ||||||
if hasattr(request, "params") and request.params and hasattr(request.params, "name"): | ||||||
span.set_attribute(MCPAttributes.MCP_TOOL_NAME, request.params.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well this doesn't make sense, your code indicates that your instrumentation library instruments mcp >= 1.6.0 but here you have mcp >= 1.1.0. So which is it?
_instruments = ("mcp >= 1.6.0",)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, this is my mistake. I was able to see that there's no module named "mcp.server.lowlevel" with version 1.1. The minimum version for my instrumentor to be imported is 1.3. However, there's some significant changes to how client message handling is done in the version of 1.6. Explanation from openinference: Arize-ai/openinference#1563 . I will change it to mcp >= 1.6.0. Thank you for pointing it out!