Skip to content

Commit 2a728b9

Browse files
committed
Add initial Activity / Metric support
- Creates an Activity around every outgoing send / incoming receive handling - Creates a histogram for each incoming operation, tagged with request/notification details
1 parent 25bcb44 commit 2a728b9

File tree

5 files changed

+195
-15
lines changed

5 files changed

+195
-15
lines changed

Directory.Packages.props

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="$(MicrosoftExtensionsVersion)" />
3434
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
3535
<PackageVersion Include="Moq" Version="4.20.72" />
36+
<PackageVersion Include="OpenTelemetry" Version="1.11.2" />
37+
<PackageVersion Include="OpenTelemetry.Exporter.InMemory" Version="1.11.2" />
3638
<PackageVersion Include="Serilog.Extensions.Hosting" Version="9.0.0" />
3739
<PackageVersion Include="Serilog.Extensions.Logging" Version="9.0.0" />
3840
<PackageVersion Include="Serilog.Sinks.Console" Version="6.0.0" />

src/ModelContextProtocol/Shared/McpSession.cs

Lines changed: 103 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
using ModelContextProtocol.Utils;
88
using ModelContextProtocol.Utils.Json;
99
using System.Collections.Concurrent;
10+
using System.Diagnostics;
11+
using System.Diagnostics.Metrics;
1012
using System.Text.Json;
1113

1214
namespace ModelContextProtocol.Shared;
@@ -16,6 +18,13 @@ namespace ModelContextProtocol.Shared;
1618
/// </summary>
1719
internal sealed class McpSession : IDisposable
1820
{
21+
private static readonly ActivitySource s_activitySource = new("ModelContextProtocol");
22+
private static readonly Meter s_meter = new("ModelContextProtocol");
23+
private static readonly Histogram<double> s_operationDurationHistogram = s_meter.CreateHistogram<double>(
24+
"modelcontextprotocol.operation.duration",
25+
"s",
26+
"Measures the duration of an operation in seconds.");
27+
1928
private readonly ITransport _transport;
2029
private readonly RequestHandlers _requestHandlers;
2130
private readonly NotificationHandlers _notificationHandlers;
@@ -152,23 +161,89 @@ await _transport.SendMessageAsync(new JsonRpcError
152161

153162
private async Task HandleMessageAsync(IJsonRpcMessage message, CancellationToken cancellationToken)
154163
{
155-
switch (message)
164+
using Activity? activity = s_activitySource.StartActivity("HandlingMessage");
165+
TagList tags = default;
166+
167+
Stopwatch? measuring = s_operationDurationHistogram.Enabled ? Stopwatch.StartNew() : null;
168+
bool addTags = activity is not null || measuring is not null;
169+
170+
try
156171
{
157-
case JsonRpcRequest request:
158-
await HandleRequest(request, cancellationToken).ConfigureAwait(false);
159-
break;
172+
if (addTags)
173+
{
174+
tags.Add("mcp.session.id", _id);
175+
}
176+
177+
switch (message)
178+
{
179+
case JsonRpcRequest request:
180+
if (addTags)
181+
{
182+
tags.Add("mcp.request.id", request.Id.ToString());
183+
tags.Add("mcp.request.method", request.Method);
184+
185+
if (request.Params is JsonElement je)
186+
{
187+
switch (request.Method)
188+
{
189+
case RequestMethods.ToolsCall:
190+
case RequestMethods.PromptsGet:
191+
if (je.TryGetProperty("name", out var prop) && prop.ValueKind == JsonValueKind.String)
192+
{
193+
tags.Add("mcp.request.params.name", prop.GetString());
194+
}
195+
break;
196+
197+
case RequestMethods.ResourcesRead:
198+
if (je.TryGetProperty("uri", out prop) && prop.ValueKind == JsonValueKind.String)
199+
{
200+
tags.Add("mcp.request.params.uri", prop.GetString());
201+
}
202+
break;
203+
}
204+
}
205+
}
206+
207+
await HandleRequest(request, cancellationToken).ConfigureAwait(false);
208+
break;
160209

161-
case IJsonRpcMessageWithId messageWithId:
162-
HandleMessageWithId(message, messageWithId);
163-
break;
210+
case JsonRpcNotification notification:
211+
if (addTags)
212+
{
213+
tags.Add("mcp.notification.method", notification.Method);
214+
}
215+
216+
await HandleNotification(notification).ConfigureAwait(false);
217+
break;
164218

165-
case JsonRpcNotification notification:
166-
await HandleNotification(notification).ConfigureAwait(false);
167-
break;
219+
case IJsonRpcMessageWithId messageWithId:
220+
HandleMessageWithId(message, messageWithId);
221+
break;
168222

169-
default:
170-
_logger.EndpointHandlerUnexpectedMessageType(EndpointName, message.GetType().Name);
171-
break;
223+
default:
224+
_logger.EndpointHandlerUnexpectedMessageType(EndpointName, message.GetType().Name);
225+
break;
226+
}
227+
}
228+
catch (Exception e) when (addTags)
229+
{
230+
tags.Add("error.type", e.GetType().FullName);
231+
throw;
232+
}
233+
finally
234+
{
235+
if (activity is not null)
236+
{
237+
foreach (var tag in tags)
238+
{
239+
activity.AddTag(tag.Key, tag.Value);
240+
}
241+
}
242+
243+
if (measuring is not null)
244+
{
245+
s_operationDurationHistogram.Record(measuring.Elapsed.TotalSeconds, tags);
246+
}
172247
}
173248
}
174249

@@ -264,12 +339,20 @@ public async Task<TResult> SendRequestAsync<TResult>(JsonRpcRequest request, Can
264339
throw new McpClientException("Transport is not connected");
265340
}
266341

342+
using Activity? activity = s_activitySource.StartActivity("SendingRequest");
343+
267344
// Set request ID
268345
if (request.Id.IsDefault)
269346
{
270347
request.Id = new RequestId($"{_id}-{Interlocked.Increment(ref _nextRequestId)}");
271348
}
272349

350+
if (activity is not null)
351+
{
352+
activity.SetTag("mcp.request.id", request.Id.ToString());
353+
activity.SetTag("mcp.request.method", request.Method);
354+
}
355+
273356
var tcs = new TaskCompletionSource<IJsonRpcMessage>(TaskCreationOptions.RunContinuationsAsynchronously);
274357
_pendingRequests[request.Id] = tcs;
275358

@@ -319,6 +402,11 @@ public async Task<TResult> SendRequestAsync<TResult>(JsonRpcRequest request, Can
319402
_logger.RequestInvalidResponseType(EndpointName, request.Method);
320403
throw new McpClientException("Invalid response type");
321404
}
405+
catch (Exception ex) when (activity is not null)
406+
{
407+
activity.AddTag("error.type", ex.GetType().FullName);
408+
throw;
409+
}
322410
finally
323411
{
324412
_pendingRequests.TryRemove(request.Id, out _);
@@ -335,6 +423,8 @@ public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken ca
335423
throw new McpClientException("Transport is not connected");
336424
}
337425

426+
using Activity? activity = s_activitySource.StartActivity("SendingMessage");
427+
338428
if (_logger.IsEnabled(LogLevel.Debug))
339429
{
340430
_logger.SendingMessage(EndpointName, JsonSerializer.Serialize(message, _jsonOptions.GetTypeInfo<IJsonRpcMessage>()));
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
using ModelContextProtocol.Client;
2+
using ModelContextProtocol.Protocol.Transport;
3+
using ModelContextProtocol.Protocol.Types;
4+
using ModelContextProtocol.Server;
5+
using OpenTelemetry.Trace;
6+
using System.Diagnostics;
7+
using System.IO.Pipelines;
8+
9+
namespace ModelContextProtocol.Tests;
10+
11+
[Collection(nameof(DisableParallelization))]
12+
public class DiagnosticTests
13+
{
14+
[Fact]
15+
public async Task Session_TracksActivities()
16+
{
17+
var activities = new List<Activity>();
18+
19+
using (var tracerProvider = OpenTelemetry.Sdk.CreateTracerProviderBuilder()
20+
.AddSource("ModelContextProtocol")
21+
.AddInMemoryExporter(activities)
22+
.Build())
23+
{
24+
await RunConnected(async (client, server) =>
25+
{
26+
var tools = await client.ListToolsAsync(TestContext.Current.CancellationToken);
27+
Assert.NotNull(tools);
28+
Assert.NotEmpty(tools);
29+
30+
var tool = tools.First(t => t.Name == "DoubleValue");
31+
await tool.InvokeAsync(new Dictionary<string, object?>() { ["amount"] = 42 }, TestContext.Current.CancellationToken);
32+
});
33+
}
34+
35+
Assert.NotEmpty(activities);
36+
37+
Activity toolCallActivity = activities.First(a =>
38+
a.Tags.Any(t => t.Key == "mcp.request.method" && t.Value == "tools/call"));
39+
Assert.Equal("DoubleValue", toolCallActivity.Tags.First(t => t.Key == "mcp.request.params.name").Value);
40+
}
41+
42+
private static async Task RunConnected(Func<IMcpClient, IMcpServer, Task> action)
43+
{
44+
Pipe clientToServerPipe = new(), serverToClientPipe = new();
45+
StreamServerTransport serverTransport = new(clientToServerPipe.Reader.AsStream(), serverToClientPipe.Writer.AsStream());
46+
StreamClientTransport clientTransport = new(clientToServerPipe.Writer.AsStream(), serverToClientPipe.Reader.AsStream());
47+
48+
Task serverTask;
49+
50+
await using (IMcpServer server = McpServerFactory.Create(serverTransport, new()
51+
{
52+
ServerInfo = new Implementation { Name = "TestServer", Version = "1.0.0" },
53+
Capabilities = new()
54+
{
55+
Tools = new()
56+
{
57+
ToolCollection = [McpServerTool.Create((int amount) => amount * 2, new() { Name = "DoubleValue", Description = "Doubles the value." })],
58+
}
59+
}
60+
}))
61+
{
62+
serverTask = server.RunAsync(TestContext.Current.CancellationToken);
63+
64+
await using (IMcpClient client = await McpClientFactory.CreateAsync(new()
65+
{
66+
Id = "TestServer",
67+
Name = "TestServer",
68+
TransportType = TransportTypes.StdIo,
69+
},
70+
createTransportFunc: (_, __) => clientTransport,
71+
cancellationToken: TestContext.Current.CancellationToken))
72+
{
73+
await action(client, server);
74+
}
75+
}
76+
77+
await serverTask;
78+
}
79+
}

tests/ModelContextProtocol.Tests/ModelContextProtocol.Tests.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
<PackageReference Include="Microsoft.Extensions.Logging.Console" />
2626
<PackageReference Include="Microsoft.NET.Test.Sdk" />
2727
<PackageReference Include="Moq" />
28+
<PackageReference Include="OpenTelemetry" />
29+
<PackageReference Include="OpenTelemetry.Exporter.InMemory" />
2830
<PackageReference Include="System.Linq.AsyncEnumerable" />
2931
<PackageReference Include="xunit.v3" />
3032
<PackageReference Include="xunit.runner.visualstudio">
Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,9 @@
1-
// Uncomment to disable parallel test execution
2-
//[assembly: CollectionBehavior(DisableTestParallelization = true)]
1+
// Uncomment to disable parallel test execution for the whole assembly
2+
//[assembly: CollectionBehavior(DisableTestParallelization = true)]
3+
4+
/// <summary>
5+
/// Enables test classes to individually be attributed as [Collection(nameof(DisableParallelization))]
6+
/// to have those tests run non-concurrently with any other tests.
7+
/// </summary>
8+
[CollectionDefinition(nameof(DisableParallelization), DisableParallelization = true)]
9+
public sealed class DisableParallelization;

0 commit comments

Comments
 (0)