From 15f47c65dc3b9de962fd83e3344db146a13ef9be Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Wed, 9 Apr 2025 18:54:11 -0700 Subject: [PATCH 1/9] Add more attributes and rename things --- Directory.Packages.props | 5 + samples/ChatWithTools/ChatWithTools.csproj | 2 + samples/ChatWithTools/Program.cs | 39 ++- .../EverythingServer/EverythingServer.csproj | 4 + samples/EverythingServer/Program.cs | 17 ++ src/ModelContextProtocol/Shared/McpSession.cs | 259 +++++++++++++----- 6 files changed, 259 insertions(+), 67 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index acdc0ee88..b741c9af7 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -67,5 +67,10 @@ + + + + + \ No newline at end of file diff --git a/samples/ChatWithTools/ChatWithTools.csproj b/samples/ChatWithTools/ChatWithTools.csproj index 8e08a455d..13bdafc0c 100644 --- a/samples/ChatWithTools/ChatWithTools.csproj +++ b/samples/ChatWithTools/ChatWithTools.csproj @@ -15,6 +15,8 @@ + + diff --git a/samples/ChatWithTools/Program.cs b/samples/ChatWithTools/Program.cs index dd09a7c97..4f224ad79 100644 --- a/samples/ChatWithTools/Program.cs +++ b/samples/ChatWithTools/Program.cs @@ -3,6 +3,35 @@ using Microsoft.Extensions.AI; using OpenAI; +using OpenTelemetry; +using OpenTelemetry.Trace; +using System.Diagnostics; +using Microsoft.Extensions.Logging; +using OpenTelemetry.Logs; +using OpenTelemetry.Metrics; + +using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddHttpClientInstrumentation() + .AddSource("*") + .AddOtlpExporter() + .Build(); +using var metricsProvider = Sdk.CreateMeterProviderBuilder() + .AddHttpClientInstrumentation() + .AddMeter("*") + .AddOtlpExporter() + .Build(); + +using var loggerFactory = LoggerFactory.Create(builder => + { + builder.AddOpenTelemetry(opt => + { + opt.IncludeFormattedMessage = true; + opt.IncludeScopes = true; + opt.AddOtlpExporter(); + }); + }); + +var mainSource = new ActivitySource("ModelContextProtocol.Client.Sample"); // Connect to an MCP server Console.WriteLine("Connecting client to MCP 'everything' server"); var mcpClient = await McpClientFactory.CreateAsync( @@ -11,7 +40,8 @@ Command = "npx", Arguments = ["-y", "--verbose", "@modelcontextprotocol/server-everything"], Name = "Everything", - })); + }), + loggerFactory: loggerFactory); // Get all available tools Console.WriteLine("Tools available:"); @@ -20,19 +50,24 @@ { Console.WriteLine($" {tool}"); } + Console.WriteLine(); // Create an IChatClient. (This shows using OpenAIClient, but it could be any other IChatClient implementation.) // Provide your own OPENAI_API_KEY via an environment variable. using IChatClient chatClient = new OpenAIClient(Environment.GetEnvironmentVariable("OPENAI_API_KEY")).GetChatClient("gpt-4o-mini").AsIChatClient() - .AsBuilder().UseFunctionInvocation().Build(); + .AsBuilder() + .UseFunctionInvocation() + .UseOpenTelemetry(loggerFactory: loggerFactory, configure: o => o.EnableSensitiveData = true) + .Build(); // Have a conversation, making all tools available to the LLM. List messages = []; while (true) { Console.Write("Q: "); + messages.Add(new(ChatRole.User, Console.ReadLine())); List updates = []; diff --git a/samples/EverythingServer/EverythingServer.csproj b/samples/EverythingServer/EverythingServer.csproj index 3aee2bc2f..3b3c0e64e 100644 --- a/samples/EverythingServer/EverythingServer.csproj +++ b/samples/EverythingServer/EverythingServer.csproj @@ -9,6 +9,10 @@ + + + + diff --git a/samples/EverythingServer/Program.cs b/samples/EverythingServer/Program.cs index a0966fe75..c880cac05 100644 --- a/samples/EverythingServer/Program.cs +++ b/samples/EverythingServer/Program.cs @@ -8,6 +8,9 @@ using ModelContextProtocol; using ModelContextProtocol.Protocol.Types; using ModelContextProtocol.Server; +using OpenTelemetry.Metrics; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously @@ -186,6 +189,20 @@ await ctx.Server.RequestSamplingAsync([ return new EmptyResult(); }); + +var resource = ResourceBuilder.CreateEmpty().AddService("mcp.server"); +builder.Services.AddOpenTelemetry() + .WithTracing(b => b.SetResourceBuilder(resource) + .AddOtlpExporter() + .AddSource("*") + .AddAspNetCoreInstrumentation() + .AddHttpClientInstrumentation()) + .WithMetrics(b => b.SetResourceBuilder(resource) + .AddMeter("*") + .AddOtlpExporter() + .AddAspNetCoreInstrumentation() + .AddHttpClientInstrumentation()); + builder.Services.AddSingleton(subscriptions); builder.Services.AddHostedService(); builder.Services.AddHostedService(); diff --git a/src/ModelContextProtocol/Shared/McpSession.cs b/src/ModelContextProtocol/Shared/McpSession.cs index 062d8cf44..403a57187 100644 --- a/src/ModelContextProtocol/Shared/McpSession.cs +++ b/src/ModelContextProtocol/Shared/McpSession.cs @@ -1,4 +1,4 @@ -using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using ModelContextProtocol.Logging; using ModelContextProtocol.Protocol.Messages; @@ -23,10 +23,10 @@ internal sealed class McpSession : IDisposable "mcp.client.session.duration", "Measures the duration of a client session.", longBuckets: true); private static readonly Histogram s_serverSessionDuration = Diagnostics.CreateDurationHistogram( "mcp.server.session.duration", "Measures the duration of a server session.", longBuckets: true); - private static readonly Histogram s_clientRequestDuration = Diagnostics.CreateDurationHistogram( - "rpc.client.duration", "Measures the duration of outbound RPC.", longBuckets: false); - private static readonly Histogram s_serverRequestDuration = Diagnostics.CreateDurationHistogram( - "rpc.server.duration", "Measures the duration of inbound RPC.", longBuckets: false); + private static readonly Histogram s_clientOperationDuration = Diagnostics.CreateDurationHistogram( + "mcp.client.operation.duration", "Measures the duration of outbound message exchange.", longBuckets: false); + private static readonly Histogram s_serverOperationDuration = Diagnostics.CreateDurationHistogram( + "rpc.server.operation.duration", "Measures the duration of inbound message processing.", longBuckets: false); private readonly bool _isServer; private readonly string _transportKind; @@ -35,6 +35,8 @@ internal sealed class McpSession : IDisposable private readonly NotificationHandlers _notificationHandlers; private readonly long _sessionStartingTimestamp = Stopwatch.GetTimestamp(); + private readonly DistributedContextPropagator? _propagator = DistributedContextPropagator.Current; + /// Collection of requests sent on this session and waiting for responses. private readonly ConcurrentDictionary> _pendingRequests = []; /// @@ -184,12 +186,18 @@ await _transport.SendMessageAsync(new JsonRpcError private async Task HandleMessageAsync(IJsonRpcMessage message, CancellationToken cancellationToken) { - Histogram durationMetric = _isServer ? s_serverRequestDuration : s_clientRequestDuration; + Histogram durationMetric = _isServer ? s_serverOperationDuration : s_clientOperationDuration; string method = GetMethodName(message); long? startingTimestamp = durationMetric.Enabled ? Stopwatch.GetTimestamp() : null; - Activity? activity = Diagnostics.ActivitySource.HasListeners() ? - Diagnostics.ActivitySource.StartActivity(CreateActivityName(method)) : + + // TODO: there is a chance that we have current activity from transport - link it to + // the new one + Activity? activity = ShouldInstrument(message) && Diagnostics.ActivitySource.HasListeners()? + Diagnostics.ActivitySource.StartActivity( + CreateActivityName(method), + ActivityKind.Server, + parentContext: ExtractActivityContext(message)) : null; TagList tags = default; @@ -198,17 +206,12 @@ private async Task HandleMessageAsync(IJsonRpcMessage message, CancellationToken { if (addTags) { - AddStandardTags(ref tags, method); + AddTags(ref tags, activity, message, method); } switch (message) { case JsonRpcRequest request: - if (addTags) - { - AddRpcRequestTags(ref tags, activity, request); - } - await HandleRequest(request, cancellationToken).ConfigureAwait(false); break; @@ -227,7 +230,7 @@ private async Task HandleMessageAsync(IJsonRpcMessage message, CancellationToken } catch (Exception e) when (addTags) { - AddExceptionTags(ref tags, e); + AddExceptionTags(ref tags, activity, e); throw; } finally @@ -340,11 +343,11 @@ public async Task SendRequestAsync(JsonRpcRequest request, Canc cancellationToken.ThrowIfCancellationRequested(); - Histogram durationMetric = _isServer ? s_serverRequestDuration : s_clientRequestDuration; + Histogram durationMetric = _isServer ? s_serverOperationDuration : s_clientOperationDuration; string method = request.Method; long? startingTimestamp = durationMetric.Enabled ? Stopwatch.GetTimestamp() : null; - using Activity? activity = Diagnostics.ActivitySource.HasListeners() ? + using Activity? activity = ShouldInstrument(request) && Diagnostics.ActivitySource.HasListeners() ? Diagnostics.ActivitySource.StartActivity(CreateActivityName(method)) : null; @@ -354,6 +357,9 @@ public async Task SendRequestAsync(JsonRpcRequest request, Canc request.Id = new RequestId($"{_id}-{Interlocked.Increment(ref _nextRequestId)}"); } + // propagate trace context, noop if activity is null + _propagator?.Inject(activity, request, InjectContext); + TagList tags = default; bool addTags = activity is { IsAllDataRequested: true } || startingTimestamp is not null; @@ -363,8 +369,7 @@ public async Task SendRequestAsync(JsonRpcRequest request, Canc { if (addTags) { - AddStandardTags(ref tags, method); - AddRpcRequestTags(ref tags, activity, request); + AddTags(ref tags, activity, request, method); } // Expensive logging, use the logging framework to check if the logger is enabled @@ -396,6 +401,10 @@ public async Task SendRequestAsync(JsonRpcRequest request, Canc if (response is JsonRpcResponse success) { + if (addTags) { + MaybeAddErrorTags(ref tags, activity, success); + } + _logger.RequestResponseReceivedPayload(EndpointName, success.Result?.ToJsonString() ?? "null"); _logger.RequestResponseReceived(EndpointName, request.Method); return success; @@ -407,7 +416,7 @@ public async Task SendRequestAsync(JsonRpcRequest request, Canc } catch (Exception ex) when (addTags) { - AddExceptionTags(ref tags, ex); + AddExceptionTags(ref tags, activity, ex); throw; } finally @@ -429,22 +438,25 @@ public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken ca cancellationToken.ThrowIfCancellationRequested(); - Histogram durationMetric = _isServer ? s_serverRequestDuration : s_clientRequestDuration; + Histogram durationMetric = _isServer ? s_serverOperationDuration : s_clientOperationDuration; string method = GetMethodName(message); long? startingTimestamp = durationMetric.Enabled ? Stopwatch.GetTimestamp() : null; - using Activity? activity = Diagnostics.ActivitySource.HasListeners() ? + using Activity? activity = ShouldInstrument(message) && Diagnostics.ActivitySource.HasListeners() ? Diagnostics.ActivitySource.StartActivity(CreateActivityName(method)) : null; TagList tags = default; bool addTags = activity is { IsAllDataRequested: true } || startingTimestamp is not null; + // propagate trace context, noop if activity is null + _propagator?.Inject(activity, message, InjectContext); + try { if (addTags) { - AddStandardTags(ref tags, method); + AddTags(ref tags, activity, message, method); } if (_logger.IsEnabled(LogLevel.Debug)) @@ -466,7 +478,7 @@ public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken ca } catch (Exception ex) when (addTags) { - AddExceptionTags(ref tags, ex); + AddExceptionTags(ref tags, activity, ex); throw; } finally @@ -487,66 +499,88 @@ public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken ca } } - private string CreateActivityName(string method) => - $"mcp.{(_isServer ? "server" : "client")}.{_transportKind}/{method}"; + private string CreateActivityName(string method) => method; private static string GetMethodName(IJsonRpcMessage message) => message switch { JsonRpcRequest request => request.Method, JsonRpcNotification notification => notification.Method, - _ => "unknownMethod", + not null => message.GetType().FullName ?? "unknownMethod", + _ => "unknownMethod" }; - private void AddStandardTags(ref TagList tags, string method) + private void AddTags(ref TagList tags, Activity? activity, IJsonRpcMessage message, string method) { - tags.Add("session.id", _id); tags.Add("rpc.system", "jsonrpc"); - tags.Add("rpc.jsonrpc.version", "2.0"); tags.Add("rpc.method", method); tags.Add("network.transport", _transportKind); - // RPC spans convention also includes: + // RPC convention also includes: // server.address, server.port, client.address, client.port, network.peer.address, network.peer.port, network.type - } - private static void AddRpcRequestTags(ref TagList tags, Activity? activity, JsonRpcRequest request) - { - tags.Add("rpc.jsonrpc.request_id", request.Id.ToString()); - - if (request.Params is JsonObject paramsObj) + if (activity is { IsAllDataRequested: true }) { - switch (request.Method) - { - case RequestMethods.ToolsCall: - case RequestMethods.PromptsGet: - if (paramsObj.TryGetPropertyValue("name", out var prop) && prop?.GetValueKind() is JsonValueKind.String) - { - string name = prop.GetValue(); - tags.Add("mcp.request.params.name", name); - if (activity is not null) - { - activity.DisplayName = $"{request.Method}({name})"; - } - } - break; + // session and request id have high cardinality, so not applying to metric tags + activity.AddTag("mcp.session.id", _id); - case RequestMethods.ResourcesRead: - if (paramsObj.TryGetPropertyValue("uri", out prop) && prop?.GetValueKind() is JsonValueKind.String) - { - string uri = prop.GetValue(); - tags.Add("mcp.request.params.uri", uri); - if (activity is not null) - { - activity.DisplayName = $"{request.Method}({uri})"; - } - } - break; + if (message is IJsonRpcMessageWithId withId) { + activity.AddTag("rpc.jsonrpc.request_id", withId.Id.Id?.ToString()); } } + + JsonObject? paramsObj = message switch + { + JsonRpcRequest request => request.Params as JsonObject, + JsonRpcNotification notification => notification.Params as JsonObject, + _ => null + }; + + if (paramsObj == null) + { + return; + } + + string? target = null; + switch (method) + { + case RequestMethods.ToolsCall: + string? toolName = GetStringProperty(paramsObj, "name"); + if (toolName is not null) + { + tags.Add("mcp.tool.name", toolName); + target = toolName; + } + break; + case RequestMethods.PromptsGet: + string? promptName = GetStringProperty(paramsObj, "name"); + if (promptName is not null) + { + tags.Add("mcp.prompt.name", promptName); + target = promptName; + } + break; + + case RequestMethods.ResourcesRead: + case RequestMethods.ResourcesSubscribe: + case RequestMethods.ResourcesUnsubscribe: + case NotificationMethods.ResourceUpdatedNotification: + string? resourceUri = GetStringProperty(paramsObj, "uri"); + if (resourceUri is not null) + { + tags.Add("mcp.resource.uri", resourceUri); + target = resourceUri; + } + break; + } + + if (activity is { IsAllDataRequested: true }) + { + activity.DisplayName = target == null ? method : $"{method} {target}"; + } } - private static void AddExceptionTags(ref TagList tags, Exception e) + private static void AddExceptionTags(ref TagList tags, Activity? activity, Exception e) { if (e is AggregateException ae && ae.InnerException is not null and not AggregateException) { @@ -558,6 +592,29 @@ private static void AddExceptionTags(ref TagList tags, Exception e) (e as McpException)?.ErrorCode is int errorCode ? errorCode : e is JsonException ? ErrorCodes.ParseError : ErrorCodes.InternalError); + + if (activity is { IsAllDataRequested: true }) + { + activity.SetStatus(ActivityStatusCode.Error, e.Message); + } + } + + private static void MaybeAddErrorTags(ref TagList tags, Activity? activity, JsonRpcResponse response) + { + if (response.Result is JsonObject jsonObject + && jsonObject.TryGetPropertyValue("isError", out var isError) + && isError?.GetValueKind() == JsonValueKind.True) + { + if (activity is { IsAllDataRequested: true }) { + string? content = null; + if (jsonObject.TryGetPropertyValue("content", out var prop) && prop!= null) { + content = prop.ToJsonString(); + } + activity.SetStatus(ActivityStatusCode.Error, content); + } + + tags.Add("error.type", "_OTHER"); + } } private static void FinalizeDiagnostics( @@ -590,8 +647,8 @@ public void Dispose() if (durationMetric.Enabled) { TagList tags = default; - tags.Add("session.id", _id); tags.Add("network.transport", _transportKind); + // TODO server.address, server.port, client.address, client.port, network.peer.address, network.peer.port, network.type durationMetric.Record(GetElapsed(_sessionStartingTimestamp).TotalSeconds, tags); } @@ -614,4 +671,76 @@ private static TimeSpan GetElapsed(long startingTimestamp) => #else new((long)(s_timestampToTicks * (Stopwatch.GetTimestamp() - startingTimestamp))); #endif -} \ No newline at end of file + + private ActivityContext ExtractActivityContext(IJsonRpcMessage message) { + string? traceparent = null; + string? tracestate = null; + _propagator?.ExtractTraceIdAndState(message, ExtractContext, out traceparent, out tracestate); + ActivityContext.TryParse(traceparent, tracestate, true, out var activityContext); + return activityContext; + } + + private static void ExtractContext(object? message, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) { + fieldValues = null; + fieldValue = null; + + JsonNode? parameters = null; + switch (message) + { + case JsonRpcRequest request: + parameters = request.Params; + break; + + case JsonRpcNotification notification: + parameters = notification.Params; + break; + + default: + break; + } + + if (parameters?[fieldName] is JsonValue value && value.GetValueKind() == JsonValueKind.String) { + fieldValue = value.GetValue(); + } + } + + private static void InjectContext(object? message, string key, string value) + { + JsonNode? parameters = null; + switch (message) + { + case JsonRpcRequest request: + parameters = request.Params; + break; + + case JsonRpcNotification notification: + parameters = notification.Params; + break; + + default: + break; + } + + if (parameters is not null && parameters is JsonObject jsonObject && jsonObject[key] == null) { + jsonObject[key] = value; + } + } + + private static string? GetStringProperty(JsonObject parameters, string propName) + { + if (parameters.TryGetPropertyValue(propName, out var prop) && prop?.GetValueKind() is JsonValueKind.String) + { + return prop.GetValue(); + } + + return null; + } + + private bool ShouldInstrument(IJsonRpcMessage message) => + message switch + { + JsonRpcRequest request => true, + JsonRpcNotification notification => notification.Method != NotificationMethods.LoggingMessageNotification, + _ => false + }; +} From e4d11c7affb1af809866f87b76010e9312d991fd Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Wed, 9 Apr 2025 22:13:52 -0700 Subject: [PATCH 2/9] sampling --- .../AspNetCoreSseServer.csproj | 7 ++++ samples/AspNetCoreSseServer/Program.cs | 23 ++++++++++- samples/ChatWithTools/Program.cs | 39 +++++++++++++------ samples/EverythingServer/Program.cs | 5 ++- src/ModelContextProtocol/Shared/McpSession.cs | 11 +++++- 5 files changed, 70 insertions(+), 15 deletions(-) diff --git a/samples/AspNetCoreSseServer/AspNetCoreSseServer.csproj b/samples/AspNetCoreSseServer/AspNetCoreSseServer.csproj index 94a5ccdb9..8274f7cdc 100644 --- a/samples/AspNetCoreSseServer/AspNetCoreSseServer.csproj +++ b/samples/AspNetCoreSseServer/AspNetCoreSseServer.csproj @@ -12,4 +12,11 @@ + + + + + + + diff --git a/samples/AspNetCoreSseServer/Program.cs b/samples/AspNetCoreSseServer/Program.cs index 306a6e8f7..8adf6d9b6 100644 --- a/samples/AspNetCoreSseServer/Program.cs +++ b/samples/AspNetCoreSseServer/Program.cs @@ -1,9 +1,30 @@ using TestServerWithHosting.Tools; +using OpenTelemetry.Logs; +using OpenTelemetry.Metrics; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; +using AspNetCoreSseServer.Tools; var builder = WebApplication.CreateBuilder(args); builder.Services.AddMcpServer() .WithTools() - .WithTools(); + .WithTools() + .WithTools(); + +var resource = ResourceBuilder.CreateEmpty().AddService("mcp.server"); +builder.Services.AddOpenTelemetry() + .WithTracing(b => b.SetResourceBuilder(resource) + .AddOtlpExporter() + .AddSource("*") + .AddAspNetCoreInstrumentation() + .AddHttpClientInstrumentation()) + .WithMetrics(b => b.SetResourceBuilder(resource) + .AddMeter("*") + .AddOtlpExporter() + .AddAspNetCoreInstrumentation() + .AddHttpClientInstrumentation()) + .WithLogging(b => b.SetResourceBuilder(resource) + .AddOtlpExporter()); var app = builder.Build(); diff --git a/samples/ChatWithTools/Program.cs b/samples/ChatWithTools/Program.cs index 4f224ad79..ff945e3c0 100644 --- a/samples/ChatWithTools/Program.cs +++ b/samples/ChatWithTools/Program.cs @@ -9,6 +9,8 @@ using Microsoft.Extensions.Logging; using OpenTelemetry.Logs; using OpenTelemetry.Metrics; +using ModelContextProtocol.Protocol.Messages; +using ModelContextProtocol.Protocol.Types; using var tracerProvider = Sdk.CreateTracerProviderBuilder() .AddHttpClientInstrumentation() @@ -31,9 +33,21 @@ }); }); -var mainSource = new ActivitySource("ModelContextProtocol.Client.Sample"); // Connect to an MCP server -Console.WriteLine("Connecting client to MCP 'everything' server"); +Console.WriteLine("Connecting client to MCP 'aspnetcore' server"); + + +// Create an IChatClient. (This shows using OpenAIClient, but it could be any other IChatClient implementation.) +// Provide your own OPENAI_API_KEY via an environment variable. +using IChatClient chatClient = + new OpenAIClient(Environment.GetEnvironmentVariable("OPENAI_API_KEY")).GetChatClient("gpt-4o-mini").AsIChatClient() + .AsBuilder() + .UseFunctionInvocation() + .UseOpenTelemetry(loggerFactory: loggerFactory, configure: o => o.EnableSensitiveData = true) + .Build(); + +var samplingHandler = chatClient.CreateSamplingHandler(); + var mcpClient = await McpClientFactory.CreateAsync( new StdioClientTransport(new() { @@ -41,6 +55,16 @@ Arguments = ["-y", "--verbose", "@modelcontextprotocol/server-everything"], Name = "Everything", }), + clientOptions: new McpClientOptions() + { + Capabilities = new ClientCapabilities() + { + Sampling = new SamplingCapability() { SamplingHandler = (param, progress, ct) => { + Console.WriteLine(param?.Meta?.ProgressToken); + return samplingHandler(param, progress, ct); + } } + }, + }, loggerFactory: loggerFactory); // Get all available tools @@ -53,15 +77,6 @@ Console.WriteLine(); -// Create an IChatClient. (This shows using OpenAIClient, but it could be any other IChatClient implementation.) -// Provide your own OPENAI_API_KEY via an environment variable. -using IChatClient chatClient = - new OpenAIClient(Environment.GetEnvironmentVariable("OPENAI_API_KEY")).GetChatClient("gpt-4o-mini").AsIChatClient() - .AsBuilder() - .UseFunctionInvocation() - .UseOpenTelemetry(loggerFactory: loggerFactory, configure: o => o.EnableSensitiveData = true) - .Build(); - // Have a conversation, making all tools available to the LLM. List messages = []; while (true) @@ -71,7 +86,7 @@ messages.Add(new(ChatRole.User, Console.ReadLine())); List updates = []; - await foreach (var update in chatClient.GetStreamingResponseAsync(messages, new() { Tools = [.. tools] })) + await foreach (var update in chatClient.GetStreamingResponseAsync(messages, new() { Tools = [.. tools]})) { Console.Write(update); updates.Add(update); diff --git a/samples/EverythingServer/Program.cs b/samples/EverythingServer/Program.cs index c880cac05..e34e753a6 100644 --- a/samples/EverythingServer/Program.cs +++ b/samples/EverythingServer/Program.cs @@ -8,6 +8,7 @@ using ModelContextProtocol; using ModelContextProtocol.Protocol.Types; using ModelContextProtocol.Server; +using OpenTelemetry.Logs; using OpenTelemetry.Metrics; using OpenTelemetry.Resources; using OpenTelemetry.Trace; @@ -201,7 +202,9 @@ await ctx.Server.RequestSamplingAsync([ .AddMeter("*") .AddOtlpExporter() .AddAspNetCoreInstrumentation() - .AddHttpClientInstrumentation()); + .AddHttpClientInstrumentation()) + .WithLogging(b => b.SetResourceBuilder(resource) + .AddOtlpExporter()); builder.Services.AddSingleton(subscriptions); builder.Services.AddHostedService(); diff --git a/src/ModelContextProtocol/Shared/McpSession.cs b/src/ModelContextProtocol/Shared/McpSession.cs index 403a57187..6b59e76a2 100644 --- a/src/ModelContextProtocol/Shared/McpSession.cs +++ b/src/ModelContextProtocol/Shared/McpSession.cs @@ -197,7 +197,8 @@ private async Task HandleMessageAsync(IJsonRpcMessage message, CancellationToken Diagnostics.ActivitySource.StartActivity( CreateActivityName(method), ActivityKind.Server, - parentContext: ExtractActivityContext(message)) : + parentContext: ExtractActivityContext(message), + links: FromCurrent()) : null; TagList tags = default; @@ -743,4 +744,12 @@ private bool ShouldInstrument(IJsonRpcMessage message) => JsonRpcNotification notification => notification.Method != NotificationMethods.LoggingMessageNotification, _ => false }; + + private static ActivityLink[] FromCurrent() { + if (Activity.Current is { } activity) { + return [new ActivityLink(activity.Context)]; + } + + return Array.Empty(); + } } From 920471891265af112bc44d8c25268efd4e70d379 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Fri, 11 Apr 2025 17:25:25 -0700 Subject: [PATCH 3/9] update conventions and address feedback --- Directory.Packages.props | 9 +- samples/AspNetCoreSseServer/Program.cs | 20 +-- .../Properties/launchSettings.json | 6 +- samples/ChatWithTools/Program.cs | 39 ++--- .../EverythingServer/EverythingServer.csproj | 1 - samples/EverythingServer/Program.cs | 20 +-- src/ModelContextProtocol/Diagnostics.cs | 78 ++++++++++ src/ModelContextProtocol/Shared/McpSession.cs | 141 +++++------------- .../DiagnosticTests.cs | 98 +++++++++++- 9 files changed, 245 insertions(+), 167 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index b741c9af7..8ac0a52c3 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -58,6 +58,10 @@ + + + + @@ -67,10 +71,5 @@ - - - - - \ No newline at end of file diff --git a/samples/AspNetCoreSseServer/Program.cs b/samples/AspNetCoreSseServer/Program.cs index 8adf6d9b6..687bb6d50 100644 --- a/samples/AspNetCoreSseServer/Program.cs +++ b/samples/AspNetCoreSseServer/Program.cs @@ -1,30 +1,22 @@ using TestServerWithHosting.Tools; -using OpenTelemetry.Logs; using OpenTelemetry.Metrics; -using OpenTelemetry.Resources; using OpenTelemetry.Trace; -using AspNetCoreSseServer.Tools; +using OpenTelemetry; var builder = WebApplication.CreateBuilder(args); builder.Services.AddMcpServer() .WithTools() - .WithTools() - .WithTools(); + .WithTools(); -var resource = ResourceBuilder.CreateEmpty().AddService("mcp.server"); builder.Services.AddOpenTelemetry() - .WithTracing(b => b.SetResourceBuilder(resource) - .AddOtlpExporter() - .AddSource("*") + .WithTracing(b => b.AddSource("*") .AddAspNetCoreInstrumentation() .AddHttpClientInstrumentation()) - .WithMetrics(b => b.SetResourceBuilder(resource) - .AddMeter("*") - .AddOtlpExporter() + .WithMetrics(b => b.AddMeter("*") .AddAspNetCoreInstrumentation() .AddHttpClientInstrumentation()) - .WithLogging(b => b.SetResourceBuilder(resource) - .AddOtlpExporter()); + .WithLogging() + .UseOtlpExporter(); var app = builder.Build(); diff --git a/samples/AspNetCoreSseServer/Properties/launchSettings.json b/samples/AspNetCoreSseServer/Properties/launchSettings.json index 3b6f145d2..c789fb474 100644 --- a/samples/AspNetCoreSseServer/Properties/launchSettings.json +++ b/samples/AspNetCoreSseServer/Properties/launchSettings.json @@ -6,7 +6,8 @@ "dotnetRunMessages": true, "applicationUrl": "http://localhost:3001", "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "sse-server", } }, "https": { @@ -14,7 +15,8 @@ "dotnetRunMessages": true, "applicationUrl": "https://localhost:7133;http://localhost:3001", "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "sse-server", } } } diff --git a/samples/ChatWithTools/Program.cs b/samples/ChatWithTools/Program.cs index ff945e3c0..cfb5d0c35 100644 --- a/samples/ChatWithTools/Program.cs +++ b/samples/ChatWithTools/Program.cs @@ -5,11 +5,9 @@ using OpenTelemetry; using OpenTelemetry.Trace; -using System.Diagnostics; using Microsoft.Extensions.Logging; using OpenTelemetry.Logs; using OpenTelemetry.Metrics; -using ModelContextProtocol.Protocol.Messages; using ModelContextProtocol.Protocol.Types; using var tracerProvider = Sdk.CreateTracerProviderBuilder() @@ -22,32 +20,21 @@ .AddMeter("*") .AddOtlpExporter() .Build(); - -using var loggerFactory = LoggerFactory.Create(builder => - { - builder.AddOpenTelemetry(opt => - { - opt.IncludeFormattedMessage = true; - opt.IncludeScopes = true; - opt.AddOtlpExporter(); - }); - }); +using var loggerFactory = LoggerFactory.Create(builder => builder.AddOpenTelemetry(opt => opt.AddOtlpExporter())); // Connect to an MCP server -Console.WriteLine("Connecting client to MCP 'aspnetcore' server"); +Console.WriteLine("Connecting client to MCP 'everything' server"); - -// Create an IChatClient. (This shows using OpenAIClient, but it could be any other IChatClient implementation.) +// Create OpenAI client (or any other compatible with IChatClient) // Provide your own OPENAI_API_KEY via an environment variable. -using IChatClient chatClient = - new OpenAIClient(Environment.GetEnvironmentVariable("OPENAI_API_KEY")).GetChatClient("gpt-4o-mini").AsIChatClient() +var openAIClient = new OpenAIClient(Environment.GetEnvironmentVariable("OPENAI_API_KEY")).GetChatClient("gpt-4o-mini"); + +// Create a sampling client. +using IChatClient samplingClient = openAIClient.AsIChatClient() .AsBuilder() - .UseFunctionInvocation() .UseOpenTelemetry(loggerFactory: loggerFactory, configure: o => o.EnableSensitiveData = true) .Build(); -var samplingHandler = chatClient.CreateSamplingHandler(); - var mcpClient = await McpClientFactory.CreateAsync( new StdioClientTransport(new() { @@ -59,10 +46,7 @@ { Capabilities = new ClientCapabilities() { - Sampling = new SamplingCapability() { SamplingHandler = (param, progress, ct) => { - Console.WriteLine(param?.Meta?.ProgressToken); - return samplingHandler(param, progress, ct); - } } + Sampling = new SamplingCapability() { SamplingHandler = samplingClient.CreateSamplingHandler() } }, }, loggerFactory: loggerFactory); @@ -77,6 +61,13 @@ Console.WriteLine(); +// Create an IChatClient that can use the tools. +using IChatClient chatClient = openAIClient.AsIChatClient() + .AsBuilder() + .UseFunctionInvocation() + .UseOpenTelemetry(loggerFactory: loggerFactory, configure: o => o.EnableSensitiveData = true) + .Build(); + // Have a conversation, making all tools available to the LLM. List messages = []; while (true) diff --git a/samples/EverythingServer/EverythingServer.csproj b/samples/EverythingServer/EverythingServer.csproj index 3b3c0e64e..d5046f7eb 100644 --- a/samples/EverythingServer/EverythingServer.csproj +++ b/samples/EverythingServer/EverythingServer.csproj @@ -11,7 +11,6 @@ - diff --git a/samples/EverythingServer/Program.cs b/samples/EverythingServer/Program.cs index e34e753a6..c9bc12729 100644 --- a/samples/EverythingServer/Program.cs +++ b/samples/EverythingServer/Program.cs @@ -8,6 +8,7 @@ using ModelContextProtocol; using ModelContextProtocol.Protocol.Types; using ModelContextProtocol.Server; +using OpenTelemetry; using OpenTelemetry.Logs; using OpenTelemetry.Metrics; using OpenTelemetry.Resources; @@ -190,21 +191,12 @@ await ctx.Server.RequestSamplingAsync([ return new EmptyResult(); }); - -var resource = ResourceBuilder.CreateEmpty().AddService("mcp.server"); +ResourceBuilder resource = ResourceBuilder.CreateDefault().AddService("everything-server"); builder.Services.AddOpenTelemetry() - .WithTracing(b => b.SetResourceBuilder(resource) - .AddOtlpExporter() - .AddSource("*") - .AddAspNetCoreInstrumentation() - .AddHttpClientInstrumentation()) - .WithMetrics(b => b.SetResourceBuilder(resource) - .AddMeter("*") - .AddOtlpExporter() - .AddAspNetCoreInstrumentation() - .AddHttpClientInstrumentation()) - .WithLogging(b => b.SetResourceBuilder(resource) - .AddOtlpExporter()); + .WithTracing(b => b.AddSource("*").AddHttpClientInstrumentation().SetResourceBuilder(resource)) + .WithMetrics(b => b.AddMeter("*").AddHttpClientInstrumentation().SetResourceBuilder(resource)) + .WithLogging(b => b.SetResourceBuilder(resource)) + .UseOtlpExporter(); builder.Services.AddSingleton(subscriptions); builder.Services.AddHostedService(); diff --git a/src/ModelContextProtocol/Diagnostics.cs b/src/ModelContextProtocol/Diagnostics.cs index 5b4e31f4d..84319412f 100644 --- a/src/ModelContextProtocol/Diagnostics.cs +++ b/src/ModelContextProtocol/Diagnostics.cs @@ -1,5 +1,8 @@ using System.Diagnostics; using System.Diagnostics.Metrics; +using System.Text.Json; +using System.Text.Json.Nodes; +using ModelContextProtocol.Protocol.Messages; namespace ModelContextProtocol; @@ -34,4 +37,79 @@ internal static Histogram CreateDurationHistogram(string name, string de HistogramBucketBoundaries = [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30, 60, 120, 300], }; #endif + + internal static ActivityContext ExtractActivityContext(this DistributedContextPropagator propagator, IJsonRpcMessage message) + { + string? traceparent = null; + string? tracestate = null; + propagator?.ExtractTraceIdAndState(message, ExtractContext, out traceparent, out tracestate); + ActivityContext.TryParse(traceparent, tracestate, true, out var activityContext); + return activityContext; + } + + private static void ExtractContext(object? message, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) + { + fieldValues = null; + fieldValue = null; + + JsonNode? parameters = null; + switch (message) + { + case JsonRpcRequest request: + parameters = request.Params; + break; + + case JsonRpcNotification notification: + parameters = notification.Params; + break; + + default: + break; + } + + if (parameters?[fieldName] is JsonValue value && value.GetValueKind() == JsonValueKind.String) + { + fieldValue = value.GetValue(); + } + } + + internal static void InjectActivityContext(this DistributedContextPropagator propagator, Activity? activity, IJsonRpcMessage message) + { + // noop if activity is null + propagator?.Inject(activity, message, InjectContext); + } + + private static void InjectContext(object? message, string key, string value) + { + JsonNode? parameters = null; + switch (message) + { + case JsonRpcRequest request: + parameters = request.Params; + break; + + case JsonRpcNotification notification: + parameters = notification.Params; + break; + + default: + break; + } + + if (parameters is JsonObject jsonObject && jsonObject[key] == null) + { + jsonObject[key] = value; + } + } + + internal static bool ShouldInstrumentMessage(IJsonRpcMessage message) => + ActivitySource.HasListeners() && + message switch + { + JsonRpcRequest => true, + JsonRpcNotification notification => notification.Method != NotificationMethods.LoggingMessageNotification, + _ => false + }; + + internal static ActivityLink[] ActivityLinkFromCurrent() => Activity.Current is null ? [] : [new ActivityLink(Activity.Current.Context)]; } diff --git a/src/ModelContextProtocol/Shared/McpSession.cs b/src/ModelContextProtocol/Shared/McpSession.cs index 6b59e76a2..0b42a9b06 100644 --- a/src/ModelContextProtocol/Shared/McpSession.cs +++ b/src/ModelContextProtocol/Shared/McpSession.cs @@ -10,7 +10,6 @@ using System.Diagnostics.Metrics; using System.Text.Json; using System.Text.Json.Nodes; -using System.Threading.Channels; namespace ModelContextProtocol.Shared; @@ -24,9 +23,9 @@ internal sealed class McpSession : IDisposable private static readonly Histogram s_serverSessionDuration = Diagnostics.CreateDurationHistogram( "mcp.server.session.duration", "Measures the duration of a server session.", longBuckets: true); private static readonly Histogram s_clientOperationDuration = Diagnostics.CreateDurationHistogram( - "mcp.client.operation.duration", "Measures the duration of outbound message exchange.", longBuckets: false); + "mcp.client.operation.duration", "Measures the duration of outbound message.", longBuckets: false); private static readonly Histogram s_serverOperationDuration = Diagnostics.CreateDurationHistogram( - "rpc.server.operation.duration", "Measures the duration of inbound message processing.", longBuckets: false); + "mcp.server.operation.duration", "Measures the duration of inbound message processing.", longBuckets: false); private readonly bool _isServer; private readonly string _transportKind; @@ -191,14 +190,12 @@ private async Task HandleMessageAsync(IJsonRpcMessage message, CancellationToken long? startingTimestamp = durationMetric.Enabled ? Stopwatch.GetTimestamp() : null; - // TODO: there is a chance that we have current activity from transport - link it to - // the new one - Activity? activity = ShouldInstrument(message) && Diagnostics.ActivitySource.HasListeners()? + Activity? activity = Diagnostics.ShouldInstrumentMessage(message)? Diagnostics.ActivitySource.StartActivity( CreateActivityName(method), ActivityKind.Server, - parentContext: ExtractActivityContext(message), - links: FromCurrent()) : + parentContext: _propagator?.ExtractActivityContext(message) ?? default, + links: Diagnostics.ActivityLinkFromCurrent()) : null; TagList tags = default; @@ -213,7 +210,8 @@ private async Task HandleMessageAsync(IJsonRpcMessage message, CancellationToken switch (message) { case JsonRpcRequest request: - await HandleRequest(request, cancellationToken).ConfigureAwait(false); + var result = await HandleRequest(request, cancellationToken).ConfigureAwait(false); + AddResponseTags(ref tags, activity, result, method); break; case JsonRpcNotification notification: @@ -281,7 +279,7 @@ private void HandleMessageWithId(IJsonRpcMessage message, IJsonRpcMessageWithId } } - private async Task HandleRequest(JsonRpcRequest request, CancellationToken cancellationToken) + private async Task HandleRequest(JsonRpcRequest request, CancellationToken cancellationToken) { if (!_requestHandlers.TryGetValue(request.Method, out var handler)) { @@ -298,6 +296,8 @@ await _transport.SendMessageAsync(new JsonRpcResponse JsonRpc = "2.0", Result = result }, cancellationToken).ConfigureAwait(false); + + return result; } private CancellationTokenRegistration RegisterCancellation(CancellationToken cancellationToken, RequestId requestId) @@ -348,8 +348,8 @@ public async Task SendRequestAsync(JsonRpcRequest request, Canc string method = request.Method; long? startingTimestamp = durationMetric.Enabled ? Stopwatch.GetTimestamp() : null; - using Activity? activity = ShouldInstrument(request) && Diagnostics.ActivitySource.HasListeners() ? - Diagnostics.ActivitySource.StartActivity(CreateActivityName(method)) : + using Activity? activity = Diagnostics.ShouldInstrumentMessage(request) ? + Diagnostics.ActivitySource.StartActivity(CreateActivityName(method), ActivityKind.Client) : null; // Set request ID @@ -358,8 +358,7 @@ public async Task SendRequestAsync(JsonRpcRequest request, Canc request.Id = new RequestId($"{_id}-{Interlocked.Increment(ref _nextRequestId)}"); } - // propagate trace context, noop if activity is null - _propagator?.Inject(activity, request, InjectContext); + _propagator?.InjectActivityContext(activity, request); TagList tags = default; bool addTags = activity is { IsAllDataRequested: true } || startingTimestamp is not null; @@ -402,8 +401,9 @@ public async Task SendRequestAsync(JsonRpcRequest request, Canc if (response is JsonRpcResponse success) { - if (addTags) { - MaybeAddErrorTags(ref tags, activity, success); + if (addTags) + { + AddResponseTags(ref tags, activity, success.Result, method); } _logger.RequestResponseReceivedPayload(EndpointName, success.Result?.ToJsonString() ?? "null"); @@ -443,15 +443,15 @@ public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken ca string method = GetMethodName(message); long? startingTimestamp = durationMetric.Enabled ? Stopwatch.GetTimestamp() : null; - using Activity? activity = ShouldInstrument(message) && Diagnostics.ActivitySource.HasListeners() ? - Diagnostics.ActivitySource.StartActivity(CreateActivityName(method)) : + using Activity? activity = Diagnostics.ShouldInstrumentMessage(message) ? + Diagnostics.ActivitySource.StartActivity(CreateActivityName(method), ActivityKind.Client) : null; TagList tags = default; bool addTags = activity is { IsAllDataRequested: true } || startingTimestamp is not null; // propagate trace context, noop if activity is null - _propagator?.Inject(activity, message, InjectContext); + _propagator?.InjectActivityContext(activity, message); try { @@ -513,8 +513,7 @@ private static string GetMethodName(IJsonRpcMessage message) => private void AddTags(ref TagList tags, Activity? activity, IJsonRpcMessage message, string method) { - tags.Add("rpc.system", "jsonrpc"); - tags.Add("rpc.method", method); + tags.Add("mcp.method.name", method); tags.Add("network.transport", _transportKind); // RPC convention also includes: @@ -525,8 +524,9 @@ private void AddTags(ref TagList tags, Activity? activity, IJsonRpcMessage messa // session and request id have high cardinality, so not applying to metric tags activity.AddTag("mcp.session.id", _id); - if (message is IJsonRpcMessageWithId withId) { - activity.AddTag("rpc.jsonrpc.request_id", withId.Id.Id?.ToString()); + if (message is IJsonRpcMessageWithId withId) + { + activity.AddTag("mcp.request.id", withId.Id.Id?.ToString()); } } @@ -588,11 +588,14 @@ private static void AddExceptionTags(ref TagList tags, Activity? activity, Excep e = ae.InnerException; } - tags.Add("error.type", e.GetType().FullName); - tags.Add("rpc.jsonrpc.error_code", - (e as McpException)?.ErrorCode is int errorCode ? errorCode : - e is JsonException ? ErrorCodes.ParseError : - ErrorCodes.InternalError); + int? intErrorCode = (e as McpException)?.ErrorCode is int errorCode ? errorCode : + e is JsonException ? ErrorCodes.ParseError : null; + + tags.Add("error.type", intErrorCode == null ? e.GetType().FullName : intErrorCode.ToString()); + if (intErrorCode is not null) + { + tags.Add("rpc.jsonrpc.error_code", intErrorCode.ToString()); + } if (activity is { IsAllDataRequested: true }) { @@ -600,21 +603,23 @@ private static void AddExceptionTags(ref TagList tags, Activity? activity, Excep } } - private static void MaybeAddErrorTags(ref TagList tags, Activity? activity, JsonRpcResponse response) + private static void AddResponseTags(ref TagList tags, Activity? activity, JsonNode? response, string method) { - if (response.Result is JsonObject jsonObject + if (response is JsonObject jsonObject && jsonObject.TryGetPropertyValue("isError", out var isError) && isError?.GetValueKind() == JsonValueKind.True) { - if (activity is { IsAllDataRequested: true }) { + if (activity is { IsAllDataRequested: true }) + { string? content = null; - if (jsonObject.TryGetPropertyValue("content", out var prop) && prop!= null) { + if (jsonObject.TryGetPropertyValue("content", out var prop) && prop!= null) + { content = prop.ToJsonString(); } activity.SetStatus(ActivityStatusCode.Error, content); } - tags.Add("error.type", "_OTHER"); + tags.Add("error.type", method == RequestMethods.ToolsCall ? "tool_error" : "_OTHER"); } } @@ -673,60 +678,6 @@ private static TimeSpan GetElapsed(long startingTimestamp) => new((long)(s_timestampToTicks * (Stopwatch.GetTimestamp() - startingTimestamp))); #endif - private ActivityContext ExtractActivityContext(IJsonRpcMessage message) { - string? traceparent = null; - string? tracestate = null; - _propagator?.ExtractTraceIdAndState(message, ExtractContext, out traceparent, out tracestate); - ActivityContext.TryParse(traceparent, tracestate, true, out var activityContext); - return activityContext; - } - - private static void ExtractContext(object? message, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) { - fieldValues = null; - fieldValue = null; - - JsonNode? parameters = null; - switch (message) - { - case JsonRpcRequest request: - parameters = request.Params; - break; - - case JsonRpcNotification notification: - parameters = notification.Params; - break; - - default: - break; - } - - if (parameters?[fieldName] is JsonValue value && value.GetValueKind() == JsonValueKind.String) { - fieldValue = value.GetValue(); - } - } - - private static void InjectContext(object? message, string key, string value) - { - JsonNode? parameters = null; - switch (message) - { - case JsonRpcRequest request: - parameters = request.Params; - break; - - case JsonRpcNotification notification: - parameters = notification.Params; - break; - - default: - break; - } - - if (parameters is not null && parameters is JsonObject jsonObject && jsonObject[key] == null) { - jsonObject[key] = value; - } - } - private static string? GetStringProperty(JsonObject parameters, string propName) { if (parameters.TryGetPropertyValue(propName, out var prop) && prop?.GetValueKind() is JsonValueKind.String) @@ -736,20 +687,4 @@ private static void InjectContext(object? message, string key, string value) return null; } - - private bool ShouldInstrument(IJsonRpcMessage message) => - message switch - { - JsonRpcRequest request => true, - JsonRpcNotification notification => notification.Method != NotificationMethods.LoggingMessageNotification, - _ => false - }; - - private static ActivityLink[] FromCurrent() { - if (Activity.Current is { } activity) { - return [new ActivityLink(activity.Context)]; - } - - return Array.Empty(); - } } diff --git a/tests/ModelContextProtocol.Tests/DiagnosticTests.cs b/tests/ModelContextProtocol.Tests/DiagnosticTests.cs index fdd3e81f8..6e9f9a62d 100644 --- a/tests/ModelContextProtocol.Tests/DiagnosticTests.cs +++ b/tests/ModelContextProtocol.Tests/DiagnosticTests.cs @@ -4,6 +4,7 @@ using OpenTelemetry.Trace; using System.Diagnostics; using System.IO.Pipelines; +using System.Reflection.Metadata; namespace ModelContextProtocol.Tests; @@ -33,9 +34,95 @@ await RunConnected(async (client, server) => Assert.NotEmpty(activities); - Activity toolCallActivity = activities.First(a => - a.Tags.Any(t => t.Key == "rpc.method" && t.Value == "tools/call")); - Assert.Equal("DoubleValue", toolCallActivity.Tags.First(t => t.Key == "mcp.request.params.name").Value); + var clientToolCall = Assert.Single(activities, a => + a.Tags.Any(t => t.Key == "mcp.tool.name" && t.Value == "DoubleValue") && + a.Tags.Any(t => t.Key == "mcp.method.name" && t.Value == "tools/call") && + a.DisplayName == "tools/call DoubleValue" && + a.Kind == ActivityKind.Client && + a.Status == ActivityStatusCode.Unset); + + var serverToolCall = Assert.Single(activities, a => + a.Tags.Any(t => t.Key == "mcp.tool.name" && t.Value == "DoubleValue") && + a.Tags.Any(t => t.Key == "mcp.method.name" && t.Value == "tools/call") && + a.DisplayName == "tools/call DoubleValue" && + a.Kind == ActivityKind.Server && + a.Status == ActivityStatusCode.Unset); + + Assert.Equal(clientToolCall.SpanId, serverToolCall.ParentSpanId); + Assert.Equal(clientToolCall.TraceId, serverToolCall.TraceId); + + var clientListToolsCall = Assert.Single(activities, a => + a.Tags.Any(t => t.Key == "mcp.method.name" && t.Value == "tools/list") && + a.DisplayName == "tools/list" && + a.Kind == ActivityKind.Client && + a.Status == ActivityStatusCode.Unset); + + var serverListToolsCall = Assert.Single(activities, a => + a.Tags.Any(t => t.Key == "mcp.method.name" && t.Value == "tools/list") && + a.DisplayName == "tools/list" && + a.Kind == ActivityKind.Server && + a.Status == ActivityStatusCode.Unset); + + Assert.Equal(clientListToolsCall.SpanId, serverListToolsCall.ParentSpanId); + Assert.Equal(clientListToolsCall.TraceId, serverListToolsCall.TraceId); + } + + [Fact] + public async Task Session_FailedToolCall() + { + var activities = new List(); + + using (var tracerProvider = OpenTelemetry.Sdk.CreateTracerProviderBuilder() + .AddSource("Experimental.ModelContextProtocol") + .AddInMemoryExporter(activities) + .Build()) + { + await RunConnected(async (client, server) => + { + await client.CallToolAsync("Throw", cancellationToken: TestContext.Current.CancellationToken); + await Assert.ThrowsAsync(() => client.CallToolAsync("does-not-exist", cancellationToken: TestContext.Current.CancellationToken)); + }); + } + + Assert.NotEmpty(activities); + + var throwToolClient = Assert.Single(activities, a => + a.Tags.Any(t => t.Key == "mcp.tool.name" && t.Value == "Throw") && + a.Tags.Any(t => t.Key == "mcp.method.name" && t.Value == "tools/call") && + a.DisplayName == "tools/call Throw" && + a.Kind == ActivityKind.Client); + + Assert.Equal(ActivityStatusCode.Error, throwToolClient.Status); + Assert.Equal("[{\"type\":\"text\",\"text\":\"boom\"}]", throwToolClient.StatusDescription); + + var throwToolServer = Assert.Single(activities, a => + a.Tags.Any(t => t.Key == "mcp.tool.name" && t.Value == "Throw") && + a.Tags.Any(t => t.Key == "mcp.method.name" && t.Value == "tools/call") && + a.DisplayName == "tools/call Throw" && + a.Kind == ActivityKind.Server); + + Assert.Equal(ActivityStatusCode.Error, throwToolServer.Status); + Assert.Equal("[{\"type\":\"text\",\"text\":\"boom\"}]", throwToolServer.StatusDescription); + + var doesNotExistToolClient = Assert.Single(activities, a => + a.Tags.Any(t => t.Key == "mcp.tool.name" && t.Value == "does-not-exist") && + a.Tags.Any(t => t.Key == "mcp.method.name" && t.Value == "tools/call") && + a.DisplayName == "tools/call does-not-exist" && + a.Kind == ActivityKind.Client); + + Assert.Equal(ActivityStatusCode.Error, doesNotExistToolClient.Status); + Assert.Equal("Request failed (server side): Unknown tool 'does-not-exist'", doesNotExistToolClient.StatusDescription); + Assert.Equal("-32603", doesNotExistToolClient.Tags.Single(t => t.Key == "rpc.jsonrpc.error_code").Value); + + var doesNotExistToolServer = Assert.Single(activities, a => + a.Tags.Any(t => t.Key == "mcp.tool.name" && t.Value == "does-not-exist") && + a.Tags.Any(t => t.Key == "mcp.method.name" && t.Value == "tools/call") && + a.DisplayName == "tools/call does-not-exist" && + a.Kind == ActivityKind.Server); + + Assert.Equal(ActivityStatusCode.Error, doesNotExistToolServer.Status); + Assert.Equal("Request failed (server side): Unknown tool 'does-not-exist'", doesNotExistToolClient.StatusDescription); + Assert.Equal("-32603", doesNotExistToolClient.Tags.Single(t => t.Key == "rpc.jsonrpc.error_code").Value); } private static async Task RunConnected(Func action) @@ -52,7 +139,10 @@ private static async Task RunConnected(Func action { Tools = new() { - ToolCollection = [McpServerTool.Create((int amount) => amount * 2, new() { Name = "DoubleValue", Description = "Doubles the value." })], + ToolCollection = [ + McpServerTool.Create((int amount) => amount * 2, new() { Name = "DoubleValue", Description = "Doubles the value." }), + McpServerTool.Create(() => { throw new Exception("boom"); }, new() { Name = "Throw", Description = "Throws error." }), + ], } } })) From 661b3d2bb622cf764ed25ccef291ce7731a6c3ee Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Fri, 11 Apr 2025 17:28:20 -0700 Subject: [PATCH 4/9] more nits --- samples/ChatWithTools/Program.cs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/samples/ChatWithTools/Program.cs b/samples/ChatWithTools/Program.cs index cfb5d0c35..a2f94c916 100644 --- a/samples/ChatWithTools/Program.cs +++ b/samples/ChatWithTools/Program.cs @@ -8,7 +8,6 @@ using Microsoft.Extensions.Logging; using OpenTelemetry.Logs; using OpenTelemetry.Metrics; -using ModelContextProtocol.Protocol.Types; using var tracerProvider = Sdk.CreateTracerProviderBuilder() .AddHttpClientInstrumentation() @@ -42,12 +41,9 @@ Arguments = ["-y", "--verbose", "@modelcontextprotocol/server-everything"], Name = "Everything", }), - clientOptions: new McpClientOptions() + clientOptions: new() { - Capabilities = new ClientCapabilities() - { - Sampling = new SamplingCapability() { SamplingHandler = samplingClient.CreateSamplingHandler() } - }, + Capabilities = new() { Sampling = new() { SamplingHandler = samplingClient.CreateSamplingHandler() } }, }, loggerFactory: loggerFactory); From d63eb340e55618a1ca6c3e035a583d6c56f1d1a1 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Fri, 11 Apr 2025 17:53:00 -0700 Subject: [PATCH 5/9] more nits --- src/ModelContextProtocol/Shared/McpSession.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/ModelContextProtocol/Shared/McpSession.cs b/src/ModelContextProtocol/Shared/McpSession.cs index 0b42a9b06..d020decc2 100644 --- a/src/ModelContextProtocol/Shared/McpSession.cs +++ b/src/ModelContextProtocol/Shared/McpSession.cs @@ -516,8 +516,7 @@ private void AddTags(ref TagList tags, Activity? activity, IJsonRpcMessage messa tags.Add("mcp.method.name", method); tags.Add("network.transport", _transportKind); - // RPC convention also includes: - // server.address, server.port, client.address, client.port, network.peer.address, network.peer.port, network.type + // MCP convention also includes: server.address, server.port, client.address, client.port if (activity is { IsAllDataRequested: true }) { @@ -553,6 +552,7 @@ private void AddTags(ref TagList tags, Activity? activity, IJsonRpcMessage messa target = toolName; } break; + case RequestMethods.PromptsGet: string? promptName = GetStringProperty(paramsObj, "name"); if (promptName is not null) @@ -616,6 +616,7 @@ private static void AddResponseTags(ref TagList tags, Activity? activity, JsonNo { content = prop.ToJsonString(); } + activity.SetStatus(ActivityStatusCode.Error, content); } @@ -654,7 +655,8 @@ public void Dispose() { TagList tags = default; tags.Add("network.transport", _transportKind); - // TODO server.address, server.port, client.address, client.port, network.peer.address, network.peer.port, network.type + + // MCP convention also includes: server.address, server.port, client.address, client.port durationMetric.Record(GetElapsed(_sessionStartingTimestamp).TotalSeconds, tags); } From 2ad3362269482926c7e82ee11612f6468cd4e6e0 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Fri, 11 Apr 2025 18:04:08 -0700 Subject: [PATCH 6/9] more nits --- samples/ChatWithTools/Program.cs | 3 +-- src/ModelContextProtocol/Shared/McpSession.cs | 3 +-- tests/ModelContextProtocol.Tests/DiagnosticTests.cs | 5 ----- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/samples/ChatWithTools/Program.cs b/samples/ChatWithTools/Program.cs index a2f94c916..8c5ae8237 100644 --- a/samples/ChatWithTools/Program.cs +++ b/samples/ChatWithTools/Program.cs @@ -69,11 +69,10 @@ while (true) { Console.Write("Q: "); - messages.Add(new(ChatRole.User, Console.ReadLine())); List updates = []; - await foreach (var update in chatClient.GetStreamingResponseAsync(messages, new() { Tools = [.. tools]})) + await foreach (var update in chatClient.GetStreamingResponseAsync(messages, new() { Tools = [.. tools] })) { Console.Write(update); updates.Add(update); diff --git a/src/ModelContextProtocol/Shared/McpSession.cs b/src/ModelContextProtocol/Shared/McpSession.cs index d020decc2..2465cc4f1 100644 --- a/src/ModelContextProtocol/Shared/McpSession.cs +++ b/src/ModelContextProtocol/Shared/McpSession.cs @@ -450,7 +450,7 @@ public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken ca TagList tags = default; bool addTags = activity is { IsAllDataRequested: true } || startingTimestamp is not null; - // propagate trace context, noop if activity is null + // propagate trace context _propagator?.InjectActivityContext(activity, message); try @@ -507,7 +507,6 @@ private static string GetMethodName(IJsonRpcMessage message) => { JsonRpcRequest request => request.Method, JsonRpcNotification notification => notification.Method, - not null => message.GetType().FullName ?? "unknownMethod", _ => "unknownMethod" }; diff --git a/tests/ModelContextProtocol.Tests/DiagnosticTests.cs b/tests/ModelContextProtocol.Tests/DiagnosticTests.cs index 6e9f9a62d..1d1689140 100644 --- a/tests/ModelContextProtocol.Tests/DiagnosticTests.cs +++ b/tests/ModelContextProtocol.Tests/DiagnosticTests.cs @@ -4,7 +4,6 @@ using OpenTelemetry.Trace; using System.Diagnostics; using System.IO.Pipelines; -using System.Reflection.Metadata; namespace ModelContextProtocol.Tests; @@ -93,7 +92,6 @@ await RunConnected(async (client, server) => a.Kind == ActivityKind.Client); Assert.Equal(ActivityStatusCode.Error, throwToolClient.Status); - Assert.Equal("[{\"type\":\"text\",\"text\":\"boom\"}]", throwToolClient.StatusDescription); var throwToolServer = Assert.Single(activities, a => a.Tags.Any(t => t.Key == "mcp.tool.name" && t.Value == "Throw") && @@ -102,7 +100,6 @@ await RunConnected(async (client, server) => a.Kind == ActivityKind.Server); Assert.Equal(ActivityStatusCode.Error, throwToolServer.Status); - Assert.Equal("[{\"type\":\"text\",\"text\":\"boom\"}]", throwToolServer.StatusDescription); var doesNotExistToolClient = Assert.Single(activities, a => a.Tags.Any(t => t.Key == "mcp.tool.name" && t.Value == "does-not-exist") && @@ -111,7 +108,6 @@ await RunConnected(async (client, server) => a.Kind == ActivityKind.Client); Assert.Equal(ActivityStatusCode.Error, doesNotExistToolClient.Status); - Assert.Equal("Request failed (server side): Unknown tool 'does-not-exist'", doesNotExistToolClient.StatusDescription); Assert.Equal("-32603", doesNotExistToolClient.Tags.Single(t => t.Key == "rpc.jsonrpc.error_code").Value); var doesNotExistToolServer = Assert.Single(activities, a => @@ -121,7 +117,6 @@ await RunConnected(async (client, server) => a.Kind == ActivityKind.Server); Assert.Equal(ActivityStatusCode.Error, doesNotExistToolServer.Status); - Assert.Equal("Request failed (server side): Unknown tool 'does-not-exist'", doesNotExistToolClient.StatusDescription); Assert.Equal("-32603", doesNotExistToolClient.Tags.Single(t => t.Key == "rpc.jsonrpc.error_code").Value); } From f2f085b276154dcb0c91eb4e60e7b73841e273ef Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Sat, 12 Apr 2025 17:37:48 -0700 Subject: [PATCH 7/9] address feedback --- src/ModelContextProtocol/Diagnostics.cs | 6 +-- src/ModelContextProtocol/Shared/McpSession.cs | 39 +++++++------------ 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/src/ModelContextProtocol/Diagnostics.cs b/src/ModelContextProtocol/Diagnostics.cs index 84319412f..a5d293f0c 100644 --- a/src/ModelContextProtocol/Diagnostics.cs +++ b/src/ModelContextProtocol/Diagnostics.cs @@ -40,9 +40,7 @@ internal static Histogram CreateDurationHistogram(string name, string de internal static ActivityContext ExtractActivityContext(this DistributedContextPropagator propagator, IJsonRpcMessage message) { - string? traceparent = null; - string? tracestate = null; - propagator?.ExtractTraceIdAndState(message, ExtractContext, out traceparent, out tracestate); + propagator.ExtractTraceIdAndState(message, ExtractContext, out var traceparent, out var tracestate); ActivityContext.TryParse(traceparent, tracestate, true, out var activityContext); return activityContext; } @@ -76,7 +74,7 @@ private static void ExtractContext(object? message, string fieldName, out string internal static void InjectActivityContext(this DistributedContextPropagator propagator, Activity? activity, IJsonRpcMessage message) { // noop if activity is null - propagator?.Inject(activity, message, InjectContext); + propagator.Inject(activity, message, InjectContext); } private static void InjectContext(object? message, string key, string value) diff --git a/src/ModelContextProtocol/Shared/McpSession.cs b/src/ModelContextProtocol/Shared/McpSession.cs index 2465cc4f1..25ce8692b 100644 --- a/src/ModelContextProtocol/Shared/McpSession.cs +++ b/src/ModelContextProtocol/Shared/McpSession.cs @@ -34,7 +34,7 @@ internal sealed class McpSession : IDisposable private readonly NotificationHandlers _notificationHandlers; private readonly long _sessionStartingTimestamp = Stopwatch.GetTimestamp(); - private readonly DistributedContextPropagator? _propagator = DistributedContextPropagator.Current; + private readonly DistributedContextPropagator _propagator = DistributedContextPropagator.Current; /// Collection of requests sent on this session and waiting for responses. private readonly ConcurrentDictionary> _pendingRequests = []; @@ -190,11 +190,11 @@ private async Task HandleMessageAsync(IJsonRpcMessage message, CancellationToken long? startingTimestamp = durationMetric.Enabled ? Stopwatch.GetTimestamp() : null; - Activity? activity = Diagnostics.ShouldInstrumentMessage(message)? + Activity? activity = Diagnostics.ShouldInstrumentMessage(message) ? Diagnostics.ActivitySource.StartActivity( CreateActivityName(method), ActivityKind.Server, - parentContext: _propagator?.ExtractActivityContext(message) ?? default, + parentContext: _propagator.ExtractActivityContext(message), links: Diagnostics.ActivityLinkFromCurrent()) : null; @@ -358,7 +358,7 @@ public async Task SendRequestAsync(JsonRpcRequest request, Canc request.Id = new RequestId($"{_id}-{Interlocked.Increment(ref _nextRequestId)}"); } - _propagator?.InjectActivityContext(activity, request); + _propagator.InjectActivityContext(activity, request); TagList tags = default; bool addTags = activity is { IsAllDataRequested: true } || startingTimestamp is not null; @@ -515,8 +515,9 @@ private void AddTags(ref TagList tags, Activity? activity, IJsonRpcMessage messa tags.Add("mcp.method.name", method); tags.Add("network.transport", _transportKind); - // MCP convention also includes: server.address, server.port, client.address, client.port - + // TODO (lmolkova), when using SSE transport, add: + // - server.address and server.port on client spans and metrics + // - client.address and client.port on server spans (not metrics because of cardinality) when using SSE transport if (activity is { IsAllDataRequested: true }) { // session and request id have high cardinality, so not applying to metric tags @@ -544,20 +545,11 @@ private void AddTags(ref TagList tags, Activity? activity, IJsonRpcMessage messa switch (method) { case RequestMethods.ToolsCall: - string? toolName = GetStringProperty(paramsObj, "name"); - if (toolName is not null) - { - tags.Add("mcp.tool.name", toolName); - target = toolName; - } - break; - case RequestMethods.PromptsGet: - string? promptName = GetStringProperty(paramsObj, "name"); - if (promptName is not null) + target = GetStringProperty(paramsObj, "name"); + if (target is not null) { - tags.Add("mcp.prompt.name", promptName); - target = promptName; + tags.Add(method == RequestMethods.ToolsCall ? "mcp.tool.name" : "mcp.prompt.name", target); } break; @@ -565,11 +557,10 @@ private void AddTags(ref TagList tags, Activity? activity, IJsonRpcMessage messa case RequestMethods.ResourcesSubscribe: case RequestMethods.ResourcesUnsubscribe: case NotificationMethods.ResourceUpdatedNotification: - string? resourceUri = GetStringProperty(paramsObj, "uri"); - if (resourceUri is not null) + target = GetStringProperty(paramsObj, "uri"); + if (target is not null) { - tags.Add("mcp.resource.uri", resourceUri); - target = resourceUri; + tags.Add("mcp.resource.uri", target); } break; } @@ -611,7 +602,7 @@ private static void AddResponseTags(ref TagList tags, Activity? activity, JsonNo if (activity is { IsAllDataRequested: true }) { string? content = null; - if (jsonObject.TryGetPropertyValue("content", out var prop) && prop!= null) + if (jsonObject.TryGetPropertyValue("content", out var prop) && prop != null) { content = prop.ToJsonString(); } @@ -654,8 +645,6 @@ public void Dispose() { TagList tags = default; tags.Add("network.transport", _transportKind); - - // MCP convention also includes: server.address, server.port, client.address, client.port durationMetric.Record(GetElapsed(_sessionStartingTimestamp).TotalSeconds, tags); } From cf45d2d9edc49a2f4f497a449b7d24dafd5a7815 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Sat, 12 Apr 2025 17:46:01 -0700 Subject: [PATCH 8/9] add comment on session duration --- src/ModelContextProtocol/Shared/McpSession.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ModelContextProtocol/Shared/McpSession.cs b/src/ModelContextProtocol/Shared/McpSession.cs index 25ce8692b..09ea00136 100644 --- a/src/ModelContextProtocol/Shared/McpSession.cs +++ b/src/ModelContextProtocol/Shared/McpSession.cs @@ -645,6 +645,9 @@ public void Dispose() { TagList tags = default; tags.Add("network.transport", _transportKind); + + // TODO (lmolkova): add server.address and server.port on client-side when using SSE transport, + // client.* attributes are not added to metrics because of cardinality durationMetric.Record(GetElapsed(_sessionStartingTimestamp).TotalSeconds, tags); } From 0bccb94e9aeaf19e302455390724d59feba8e962 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Sat, 12 Apr 2025 22:04:07 -0400 Subject: [PATCH 9/9] Fix build --- src/ModelContextProtocol/Shared/McpSession.cs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/ModelContextProtocol/Shared/McpSession.cs b/src/ModelContextProtocol/Shared/McpSession.cs index 09ea00136..afbcecabd 100644 --- a/src/ModelContextProtocol/Shared/McpSession.cs +++ b/src/ModelContextProtocol/Shared/McpSession.cs @@ -10,6 +10,9 @@ using System.Diagnostics.Metrics; using System.Text.Json; using System.Text.Json.Nodes; +#if !NET +using System.Threading.Channels; +#endif namespace ModelContextProtocol.Shared; @@ -515,7 +518,7 @@ private void AddTags(ref TagList tags, Activity? activity, IJsonRpcMessage messa tags.Add("mcp.method.name", method); tags.Add("network.transport", _transportKind); - // TODO (lmolkova), when using SSE transport, add: + // TODO: When using SSE transport, add: // - server.address and server.port on client spans and metrics // - client.address and client.port on server spans (not metrics because of cardinality) when using SSE transport if (activity is { IsAllDataRequested: true }) @@ -646,7 +649,7 @@ public void Dispose() TagList tags = default; tags.Add("network.transport", _transportKind); - // TODO (lmolkova): add server.address and server.port on client-side when using SSE transport, + // TODO: Add server.address and server.port on client-side when using SSE transport, // client.* attributes are not added to metrics because of cardinality durationMetric.Record(GetElapsed(_sessionStartingTimestamp).TotalSeconds, tags); }