-
Notifications
You must be signed in to change notification settings - Fork 25
[feat] Contribute mcp Instrumentor #435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
d8a0ee3
19b25ce
b2bad7c
49649e7
442a45a
10d91ac
0e6e339
6a5f9d6
db1af61
a6ba4c4
4ea1757
3f576c2
f54609e
f8e7172
1a9ce26
fb9d1c7
7b9a8f4
de0ec1d
43526f6
bd28ccc
70b0da3
5f4f773
0af52d9
0d10a82
775f0b1
141f93c
176fd90
0ea9695
e6c479e
1f4f476
8621644
c78df18
5f70570
064c71c
c97c8df
dc2c730
76c2405
323b87a
3902c60
51aaba9
9c0e57a
dc91c62
d7666ca
aab4e7d
5c1aeec
de1020e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# MCP Instrumentor | ||
|
||
OpenTelemetry MCP instrumentation package. | ||
|
||
## Installation | ||
|
||
```bash | ||
pip install mcpinstrumentor | ||
``` | ||
|
||
## Usage | ||
|
||
```python | ||
from mcpinstrumentor import MCPInstrumentor | ||
|
||
MCPInstrumentor().instrument() | ||
``` |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove this from committed files |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,191 @@ | ||||||||
import logging | ||||||||
from typing import Any, Collection | ||||||||
|
||||||||
from openinference.instrumentation.mcp.package import _instruments | ||||||||
from wrapt import register_post_import_hook, wrap_function_wrapper | ||||||||
|
||||||||
from opentelemetry import trace | ||||||||
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor | ||||||||
from opentelemetry.instrumentation.utils import unwrap | ||||||||
|
||||||||
|
||||||||
def setup_loggertwo(): | ||||||||
logger = logging.getLogger("loggertwo") | ||||||||
logger.setLevel(logging.DEBUG) | ||||||||
handler = logging.FileHandler("loggertwo.log", mode="w") | ||||||||
handler.setLevel(logging.DEBUG) | ||||||||
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") | ||||||||
handler.setFormatter(formatter) | ||||||||
if not logger.handlers: | ||||||||
logger.addHandler(handler) | ||||||||
return logger | ||||||||
|
||||||||
|
||||||||
loggertwo = setup_loggertwo() | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove this logger setup code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, thank you! |
||||||||
|
||||||||
|
||||||||
class MCPInstrumentor(BaseInstrumentor): | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can you organize your functions top-to-bottom (i.e. most important functions at top _wrap_send_request and _wrap_handle_request and then helper functions) |
||||||||
""" | ||||||||
An instrumenter for MCP. | ||||||||
""" | ||||||||
|
||||||||
def instrumentation_dependencies(self) -> Collection[str]: | ||||||||
return _instruments | ||||||||
|
||||||||
def _instrument(self, **kwargs: Any) -> None: | ||||||||
tracer_provider = kwargs.get("tracer_provider") # Move this line up | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove inline comment |
||||||||
if tracer_provider: | ||||||||
self.tracer_provider = tracer_provider | ||||||||
else: | ||||||||
self.tracer_provider = None | ||||||||
register_post_import_hook( | ||||||||
lambda _: wrap_function_wrapper( | ||||||||
"mcp.shared.session", | ||||||||
"BaseSession.send_request", | ||||||||
self._send_request_wrapper, | ||||||||
), | ||||||||
"mcp.shared.session", | ||||||||
) | ||||||||
register_post_import_hook( | ||||||||
lambda _: wrap_function_wrapper( | ||||||||
"mcp.server.lowlevel.server", | ||||||||
"Server._handle_request", | ||||||||
self._server_handle_request_wrapper, | ||||||||
), | ||||||||
"mcp.server.lowlevel.server", | ||||||||
) | ||||||||
|
||||||||
def _uninstrument(self, **kwargs: Any) -> None: | ||||||||
unwrap("mcp.shared.session", "BaseSession.send_request") | ||||||||
unwrap("mcp.server.lowlevel.server", "Server._handle_request") | ||||||||
|
||||||||
def handle_attributes(self, span, request, is_client=True): | ||||||||
import mcp.types as types | ||||||||
|
||||||||
operation = "Server Handle Request" | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
if isinstance(request, types.ListToolsRequest): | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have duplicate code where we check
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for the suggestion! I understand your point but I feel like we should keep it because each function handles distinct logic specific to client or server. To me, that's easier to read and if things go wrong, it is also easier to test fixes like I can just add an attribute to the client function to see what changes on CloudWatch. |
||||||||
operation = "ListTool" | ||||||||
span.set_attribute("mcp.list_tools", True) | ||||||||
elif isinstance(request, types.CallToolRequest): | ||||||||
if hasattr(request, "params") and hasattr(request.params, "name"): | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will this error if we directly check for request.params.name? |
||||||||
operation = request.params.name | ||||||||
span.set_attribute("mcp.call_tool", True) | ||||||||
if is_client: | ||||||||
self._add_client_attributes(span, operation, request) | ||||||||
else: | ||||||||
self._add_server_attributes(span, operation, request) | ||||||||
|
||||||||
def _add_client_attributes(self, span, operation, request): | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The attribute names we are setting should be defined here. Import to use the variable name instead of the explicit string. If we are defining our own attribute like "tool.name", we should also create and use a variable for it. (PS should this be gen_ai.tool.name?) |
||||||||
span.set_attribute("span.kind", "CLIENT") | ||||||||
span.set_attribute("aws.remote.service", "Appsignals MCP Server") | ||||||||
span.set_attribute("aws.remote.operation", operation) | ||||||||
if hasattr(request, "params") and hasattr(request.params, "name"): | ||||||||
span.set_attribute("tool.name", request.params.name) | ||||||||
|
||||||||
def _add_server_attributes(self, span, operation, request): | ||||||||
span.set_attribute("server_side", True) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. If "server_side" is defined by you then you should create a variable, otherwise import the predefined one. |
||||||||
span.set_attribute("aws.span.kind", "SERVER") | ||||||||
if hasattr(request, "params") and hasattr(request.params, "name"): | ||||||||
span.set_attribute("tool.name", request.params.name) | ||||||||
|
||||||||
def _inject_trace_context(self, request_data, span_ctx): | ||||||||
if "params" not in request_data: | ||||||||
request_data["params"] = {} | ||||||||
if "_meta" not in request_data["params"]: | ||||||||
request_data["params"]["_meta"] = {} | ||||||||
request_data["params"]["_meta"]["trace_context"] = {"trace_id": span_ctx.trace_id, "span_id": span_ctx.span_id} | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we rename There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, thank you! |
||||||||
|
||||||||
# Send Request Wrapper | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove comment |
||||||||
def _send_request_wrapper(self, wrapped, instance, args, kwargs): | ||||||||
""" | ||||||||
Changes made: | ||||||||
The wrapper intercepts the request before sending, injects distributed tracing context into the | ||||||||
request's params._meta field and creates OpenTelemetry spans. The wrapper does not change anything | ||||||||
else from the original function's behavior because it reconstructs the request object with the same | ||||||||
type and calling the original function with identical parameters. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: probably don't need to talk about how it preserves the function's behavior. More importantly, what does the wrapper introduce? (adds attributes, etc.) same for the server wrapper |
||||||||
""" | ||||||||
|
||||||||
async def async_wrapper(): | ||||||||
if self.tracer_provider is None: | ||||||||
tracer = trace.get_tracer("mcp.client") | ||||||||
else: | ||||||||
tracer = self.tracer_provider.get_tracer("mcp.client") | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider moving the above logic into a separate function since its duplicated in the server. You could pass in either mcp.client or mcp.server. |
||||||||
with tracer.start_as_current_span("client.send_request", kind=trace.SpanKind.CLIENT) as span: | ||||||||
span_ctx = span.get_span_context() | ||||||||
request = args[0] if len(args) > 0 else kwargs.get("request") | ||||||||
if request: | ||||||||
req_root = request.root if hasattr(request, "root") else request | ||||||||
|
||||||||
self.handle_attributes(span, req_root, True) | ||||||||
request_data = request.model_dump(by_alias=True, mode="json", exclude_none=True) | ||||||||
self._inject_trace_context(request_data, span_ctx) | ||||||||
# Reconstruct request object with injected trace context | ||||||||
modified_request = type(request).model_validate(request_data) | ||||||||
if len(args) > 0: | ||||||||
new_args = (modified_request,) + args[1:] | ||||||||
result = await wrapped(*new_args, **kwargs) | ||||||||
else: | ||||||||
kwargs["request"] = modified_request | ||||||||
result = await wrapped(*args, **kwargs) | ||||||||
else: | ||||||||
result = await wrapped(*args, **kwargs) | ||||||||
return result | ||||||||
|
||||||||
return async_wrapper() | ||||||||
|
||||||||
def getname(self, req): | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: make function name more descriptive, like _span_name_from_request There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do. Thanks for suggestion! |
||||||||
span_name = "unknown" | ||||||||
import mcp.types as types | ||||||||
|
||||||||
if isinstance(req, types.ListToolsRequest): | ||||||||
span_name = "tools/list" | ||||||||
elif isinstance(req, types.CallToolRequest): | ||||||||
if hasattr(req, "params") and hasattr(req.params, "name"): | ||||||||
span_name = f"tools/{req.params.name}" | ||||||||
else: | ||||||||
span_name = "unknown" | ||||||||
return span_name | ||||||||
|
||||||||
# Handle Request Wrapper | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove comment |
||||||||
async def _server_handle_request_wrapper(self, wrapped, instance, args, kwargs): | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be nice to have type hints (see random example) here for more clearer readability. |
||||||||
""" | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be helpful if you can add a comment of what the request object looks like. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, thanks for the suggestion! |
||||||||
Changes made: | ||||||||
This wrapper intercepts requests before processing, extracts distributed tracing context from | ||||||||
the request's params._meta field, and creates server-side OpenTelemetry spans linked to the client spans. | ||||||||
The wrapper also does not change the original function's behavior by calling it with identical parameters | ||||||||
ensuring no breaking changes to the MCP server functionality. | ||||||||
""" | ||||||||
req = args[1] if len(args) > 1 else None | ||||||||
trace_context = None | ||||||||
|
||||||||
if req and hasattr(req, "params") and req.params and hasattr(req.params, "meta") and req.params.meta: | ||||||||
trace_context = req.params.meta.trace_context | ||||||||
if trace_context: | ||||||||
|
||||||||
if self.tracer_provider is None: | ||||||||
tracer = trace.get_tracer("mcp.server") | ||||||||
else: | ||||||||
tracer = self.tracer_provider.get_tracer("mcp.server") | ||||||||
trace_id = trace_context.get("trace_id") | ||||||||
span_id = trace_context.get("span_id") | ||||||||
span_context = trace.SpanContext( | ||||||||
trace_id=trace_id, | ||||||||
span_id=span_id, | ||||||||
is_remote=True, | ||||||||
trace_flags=trace.TraceFlags(trace.TraceFlags.SAMPLED), | ||||||||
trace_state=trace.TraceState(), | ||||||||
) | ||||||||
span_name = self.getname(req) | ||||||||
with tracer.start_as_current_span( | ||||||||
span_name, | ||||||||
kind=trace.SpanKind.SERVER, | ||||||||
context=trace.set_span_in_context(trace.NonRecordingSpan(span_context)), | ||||||||
) as span: | ||||||||
self.handle_attributes(span, req, False) | ||||||||
result = await wrapped(*args, **kwargs) | ||||||||
return result | ||||||||
else: | ||||||||
return await wrapped( | ||||||||
*args, | ||||||||
**kwargs, | ||||||||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
[build-system] | ||
requires = ["hatchling"] | ||
build-backend = "hatchling.build" | ||
|
||
[project] | ||
name = "amazon-opentelemetry-distro-mcpinstrumentor" | ||
version = "0.1.0" | ||
description = "OpenTelemetry MCP instrumentation for AWS Distro" | ||
readme = "README.md" | ||
license = "Apache-2.0" | ||
requires-python = ">=3.9" | ||
authors = [ | ||
{ name = "Johnny Lin", email = "[email protected]" }, | ||
] | ||
classifiers = [ | ||
"Development Status :: 4 - Beta", | ||
"Intended Audience :: Developers", | ||
"License :: OSI Approved :: Apache Software License", | ||
"Programming Language :: Python", | ||
"Programming Language :: Python :: 3", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks likes this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will go ahead and fix that. Thanks! |
||
"Programming Language :: Python :: 3.9", | ||
"Programming Language :: Python :: 3.10", | ||
"Programming Language :: Python :: 3.11", | ||
"Programming Language :: Python :: 3.12", | ||
"Programming Language :: Python :: 3.13", | ||
] | ||
dependencies = [ | ||
"opentelemetry-api", | ||
"opentelemetry-instrumentation", | ||
"opentelemetry-semantic-conventions", | ||
"wrapt", | ||
"opentelemetry-sdk", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you could be missing some dependencies here (like mcp or openinference), check dependencies that had to be installed manually outside of this list and add them here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ^^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. Thank you! |
||
] | ||
|
||
[project.optional-dependencies] | ||
instruments = ["mcp"] | ||
|
||
[project.entry-points.opentelemetry_instrumentor] | ||
mcp = "mcpinstrumentor:MCPInstrumentor" | ||
|
||
[tool.hatch.build.targets.sdist] | ||
include = [ | ||
"mcpinstrumentor.py", | ||
"README.md" | ||
] | ||
|
||
[tool.hatch.build.targets.wheel] | ||
packages = ["."] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is used to exclude certain directories from checks. You can remove these as venvs should normally be created in the root directory (covered by above paths)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed to this. Thank you for suggestion!