Skip to content

Commit d96d272

Browse files
authored
Add client span to SignalR .NET client (#57101)
1 parent c430536 commit d96d272

File tree

13 files changed

+1252
-316
lines changed

13 files changed

+1252
-316
lines changed

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 133 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public partial class HubConnection : IAsyncDisposable
5959
// Default amount of bytes we'll buffer when using Stateful Reconnect until applying backpressure to sends from the client.
6060
internal const long DefaultStatefulReconnectBufferSize = 100_000;
6161

62+
internal const string ActivityName = "Microsoft.AspNetCore.SignalR.Client.InvocationOut";
63+
6264
// The receive loop has a single reader and single writer at a time so optimize the channel for that
6365
private static readonly UnboundedChannelOptions _receiveLoopOptions = new UnboundedChannelOptions
6466
{
@@ -73,11 +75,13 @@ public partial class HubConnection : IAsyncDisposable
7375
private readonly ILoggerFactory _loggerFactory;
7476
private readonly ILogger _logger;
7577
private readonly ConnectionLogScope _logScope;
78+
private readonly ActivitySource _activitySource;
7679
private readonly IHubProtocol _protocol;
7780
private readonly IServiceProvider _serviceProvider;
7881
private readonly IConnectionFactory _connectionFactory;
7982
private readonly IRetryPolicy? _reconnectPolicy;
8083
private readonly EndPoint _endPoint;
84+
private readonly string? _serviceName;
8185
private readonly ConcurrentDictionary<string, InvocationHandlerList> _handlers = new ConcurrentDictionary<string, InvocationHandlerList>(StringComparer.Ordinal);
8286

8387
// Holds all mutable state other than user-defined handlers and settable properties.
@@ -235,6 +239,10 @@ public HubConnection(IConnectionFactory connectionFactory,
235239

236240
_logScope = new ConnectionLogScope();
237241

242+
// ActivitySource can be resolved from the service provider when unit testing.
243+
_activitySource = (serviceProvider.GetService<SignalRClientActivitySource>() ?? SignalRClientActivitySource.Instance).ActivitySource;
244+
_serviceName = (_endPoint is UriEndPoint e) ? e.Uri.AbsolutePath.Trim('/') : null;
245+
238246
var options = serviceProvider.GetService<IOptions<HubConnectionOptions>>();
239247

240248
ServerTimeout = options?.Value.ServerTimeout ?? DefaultServerTimeout;
@@ -720,7 +728,8 @@ async Task OnStreamCanceled(InvocationRequest irq)
720728
var readers = default(Dictionary<string, object>);
721729

722730
CheckDisposed();
723-
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(StreamAsChannelCoreAsync), token: cancellationToken).ConfigureAwait(false);
731+
732+
var (connectionState, activity) = await WaitForActiveConnectionWithActivityAsync(nameof(StreamAsChannelCoreAsync), methodName, token: cancellationToken).ConfigureAwait(false);
724733

725734
ChannelReader<object?> channel;
726735
try
@@ -731,7 +740,7 @@ async Task OnStreamCanceled(InvocationRequest irq)
731740
readers = PackageStreamingParams(connectionState, ref args, out var streamIds);
732741

733742
// I just want an excuse to use 'irq' as a variable name...
734-
var irq = InvocationRequest.Stream(cancellationToken, returnType, connectionState.GetNextId(), _loggerFactory, this, out channel);
743+
var irq = InvocationRequest.Stream(cancellationToken, returnType, connectionState.GetNextId(), _loggerFactory, this, activity, out channel);
735744
await InvokeStreamCore(connectionState, methodName, irq, args, streamIds?.ToArray(), cancellationToken).ConfigureAwait(false);
736745

737746
if (cancellationToken.CanBeCanceled)
@@ -1003,12 +1012,71 @@ private async Task CommonStreaming(ConnectionState connectionState, string strea
10031012
}
10041013
}
10051014

1015+
private async Task<(ConnectionState, Activity?)> WaitForActiveConnectionWithActivityAsync(string sendingMethodName, string invokedMethodName, CancellationToken token)
1016+
{
1017+
// Start the activity before waiting on the connection.
1018+
// Starting the activity here means time to connect or reconnect is included in the invoke.
1019+
var activity = CreateActivity(invokedMethodName);
1020+
1021+
try
1022+
{
1023+
ConnectionState connectionState;
1024+
var connectionStateTask = _state.WaitForActiveConnectionAsync(sendingMethodName, token);
1025+
if (connectionStateTask.Status == TaskStatus.RanToCompletion)
1026+
{
1027+
// Attempt to get already connected connection and set server tags using it.
1028+
connectionState = connectionStateTask.Result;
1029+
SetServerTags(activity, connectionState.ConnectionUrl);
1030+
activity?.Start();
1031+
}
1032+
else
1033+
{
1034+
// Fallback to using configured endpoint.
1035+
var initialUri = (_endPoint as UriEndPoint)?.Uri;
1036+
SetServerTags(activity, initialUri);
1037+
activity?.Start();
1038+
1039+
connectionState = await connectionStateTask.ConfigureAwait(false);
1040+
1041+
// After connection is returned, check if URL is different. If so, update activity server tags.
1042+
if (connectionState.ConnectionUrl != null && connectionState.ConnectionUrl != initialUri)
1043+
{
1044+
SetServerTags(activity, connectionState.ConnectionUrl);
1045+
}
1046+
}
1047+
1048+
return (connectionState, activity);
1049+
}
1050+
catch (Exception ex)
1051+
{
1052+
// If there is an error getting an active connection then the invocation has failed.
1053+
if (activity is not null)
1054+
{
1055+
activity.SetStatus(ActivityStatusCode.Error);
1056+
activity.SetTag("error.type", ex.GetType().FullName);
1057+
activity.Stop();
1058+
}
1059+
1060+
throw;
1061+
}
1062+
1063+
static void SetServerTags(Activity? activity, Uri? uri)
1064+
{
1065+
if (activity != null && uri != null)
1066+
{
1067+
activity.SetTag("server.address", uri.Host);
1068+
activity.SetTag("server.port", uri.Port);
1069+
}
1070+
}
1071+
}
1072+
10061073
private async Task<object?> InvokeCoreAsyncCore(string methodName, Type returnType, object?[] args, CancellationToken cancellationToken)
10071074
{
10081075
var readers = default(Dictionary<string, object>);
10091076

10101077
CheckDisposed();
1011-
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(InvokeCoreAsync), token: cancellationToken).ConfigureAwait(false);
1078+
1079+
var (connectionState, activity) = await WaitForActiveConnectionWithActivityAsync(nameof(InvokeCoreAsync), methodName, token: cancellationToken).ConfigureAwait(false);
10121080

10131081
Task<object?> invocationTask;
10141082
try
@@ -1017,7 +1085,7 @@ private async Task CommonStreaming(ConnectionState connectionState, string strea
10171085

10181086
readers = PackageStreamingParams(connectionState, ref args, out var streamIds);
10191087

1020-
var irq = InvocationRequest.Invoke(cancellationToken, returnType, connectionState.GetNextId(), _loggerFactory, this, out invocationTask);
1088+
var irq = InvocationRequest.Invoke(cancellationToken, returnType, connectionState.GetNextId(), _loggerFactory, this, activity, out invocationTask);
10211089
await InvokeCore(connectionState, methodName, irq, args, streamIds?.ToArray(), cancellationToken).ConfigureAwait(false);
10221090

10231091
LaunchStreams(connectionState, readers, cancellationToken);
@@ -1031,13 +1099,43 @@ private async Task CommonStreaming(ConnectionState connectionState, string strea
10311099
return await invocationTask.ConfigureAwait(false);
10321100
}
10331101

1102+
private Activity? CreateActivity(string methodName)
1103+
{
1104+
var activity = _activitySource.CreateActivity(ActivityName, ActivityKind.Client);
1105+
if (activity is null && Activity.Current is not null && _logger.IsEnabled(LogLevel.Critical))
1106+
{
1107+
activity = new Activity(ActivityName);
1108+
}
1109+
1110+
if (activity is not null)
1111+
{
1112+
if (!string.IsNullOrEmpty(_serviceName))
1113+
{
1114+
activity.DisplayName = $"{_serviceName}/{methodName}";
1115+
activity.SetTag("rpc.service", _serviceName);
1116+
}
1117+
else
1118+
{
1119+
activity.DisplayName = methodName;
1120+
}
1121+
1122+
activity.SetTag("rpc.system", "signalr");
1123+
activity.SetTag("rpc.method", methodName);
1124+
}
1125+
1126+
return activity;
1127+
}
1128+
10341129
private async Task InvokeCore(ConnectionState connectionState, string methodName, InvocationRequest irq, object?[] args, string[]? streams, CancellationToken cancellationToken)
10351130
{
10361131
Log.PreparingBlockingInvocation(_logger, irq.InvocationId, methodName, irq.ResultType.FullName!, args.Length);
10371132

10381133
// Client invocations are always blocking
10391134
var invocationMessage = new InvocationMessage(irq.InvocationId, methodName, args, streams);
1040-
InjectHeaders(invocationMessage);
1135+
if (irq.Activity is not null)
1136+
{
1137+
InjectHeaders(irq.Activity, invocationMessage);
1138+
}
10411139

10421140
Log.RegisteringInvocation(_logger, irq.InvocationId);
10431141
connectionState.AddInvocation(irq);
@@ -1064,7 +1162,10 @@ private async Task InvokeStreamCore(ConnectionState connectionState, string meth
10641162
Log.PreparingStreamingInvocation(_logger, irq.InvocationId, methodName, irq.ResultType.FullName!, args.Length);
10651163

10661164
var invocationMessage = new StreamInvocationMessage(irq.InvocationId, methodName, args, streams);
1067-
InjectHeaders(invocationMessage);
1165+
if (irq.Activity is not null)
1166+
{
1167+
InjectHeaders(irq.Activity, invocationMessage);
1168+
}
10681169

10691170
Log.RegisteringInvocation(_logger, irq.InvocationId);
10701171

@@ -1085,23 +1186,16 @@ private async Task InvokeStreamCore(ConnectionState connectionState, string meth
10851186
}
10861187
}
10871188

1088-
private static void InjectHeaders(HubInvocationMessage invocationMessage)
1189+
private static void InjectHeaders(Activity currentActivity, HubInvocationMessage invocationMessage)
10891190
{
1090-
// TODO: Change when SignalR client has an activity.
1091-
// This sends info about the current activity, regardless of the activity source, to the SignalR server.
1092-
// When SignalR client supports client activities this logic should be updated to only send headers
1093-
// if the SignalR client activity is created. The goal is to match the behavior of distributed tracing in HttpClient.
1094-
if (Activity.Current is { } currentActivity)
1191+
DistributedContextPropagator.Current.Inject(currentActivity, invocationMessage, static (carrier, key, value) =>
10951192
{
1096-
DistributedContextPropagator.Current.Inject(currentActivity, invocationMessage, static (carrier, key, value) =>
1193+
if (carrier is HubInvocationMessage invocationMessage)
10971194
{
1098-
if (carrier is HubInvocationMessage invocationMessage)
1099-
{
1100-
invocationMessage.Headers ??= new Dictionary<string, string>();
1101-
invocationMessage.Headers[key] = value;
1102-
}
1103-
});
1104-
}
1195+
invocationMessage.Headers ??= new Dictionary<string, string>();
1196+
invocationMessage.Headers[key] = value;
1197+
}
1198+
});
11051199
}
11061200

11071201
private async Task SendHubMessage(ConnectionState connectionState, HubMessage hubMessage, CancellationToken cancellationToken = default)
@@ -1131,7 +1225,8 @@ private async Task SendCoreAsyncCore(string methodName, object?[] args, Cancella
11311225
var readers = default(Dictionary<string, object>);
11321226

11331227
CheckDisposed();
1134-
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(SendCoreAsync), token: cancellationToken).ConfigureAwait(false);
1228+
1229+
var (connectionState, activity) = await WaitForActiveConnectionWithActivityAsync(nameof(SendCoreAsync), methodName, token: cancellationToken).ConfigureAwait(false);
11351230
try
11361231
{
11371232
CheckDisposed();
@@ -1140,12 +1235,27 @@ private async Task SendCoreAsyncCore(string methodName, object?[] args, Cancella
11401235

11411236
Log.PreparingNonBlockingInvocation(_logger, methodName, args.Length);
11421237
var invocationMessage = new InvocationMessage(null, methodName, args, streamIds?.ToArray());
1238+
if (activity is not null)
1239+
{
1240+
InjectHeaders(activity, invocationMessage);
1241+
}
11431242
await SendHubMessage(connectionState, invocationMessage, cancellationToken).ConfigureAwait(false);
11441243

11451244
LaunchStreams(connectionState, readers, cancellationToken);
11461245
}
1246+
catch (Exception ex)
1247+
{
1248+
if (activity is not null)
1249+
{
1250+
activity.SetStatus(ActivityStatusCode.Error);
1251+
activity.SetTag("error.type", ex.GetType().FullName);
1252+
activity.Stop();
1253+
}
1254+
throw;
1255+
}
11471256
finally
11481257
{
1258+
activity?.Stop();
11491259
_state.ReleaseConnectionLock();
11501260
}
11511261
}
@@ -2018,6 +2128,7 @@ private sealed class ConnectionState : IInvocationBinder
20182128
private long _nextActivationSendPing;
20192129

20202130
public ConnectionContext Connection { get; }
2131+
public Uri? ConnectionUrl { get; }
20212132
public Task? ReceiveTask { get; set; }
20222133
public Exception? CloseException { get; set; }
20232134
public CancellationToken UploadStreamToken { get; set; }
@@ -2036,6 +2147,7 @@ public bool Stopping
20362147
public ConnectionState(ConnectionContext connection, HubConnection hubConnection)
20372148
{
20382149
Connection = connection;
2150+
ConnectionUrl = (connection.RemoteEndPoint is UriEndPoint ep) ? ep.Uri : null;
20392151

20402152
_hubConnection = hubConnection;
20412153
_hubConnection._logScope.ConnectionId = connection.ConnectionId;

0 commit comments

Comments
 (0)