Skip to content

Commit c368df3

Browse files
authored
Merge pull request #1037 from mysql-net/activity-source
Report telemetry with ActivitySource.
2 parents fd1f9de + 2c9f1e5 commit c368df3

File tree

12 files changed

+292
-88
lines changed

12 files changed

+292
-88
lines changed

docs/content/tutorials/tracing.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
---
2+
date: 2021-10-16
3+
menu:
4+
main:
5+
parent: tutorials
6+
title: Tracing
7+
weight: 12
8+
---
9+
10+
# Tracing
11+
12+
MySqlConnector implements `ActivitySource` for tracing its operations. The `ActivitySource` name is `MySqlConnector`.
13+
14+
The available activity names and tags are documented in [issue 1036](https://github.com/mysql-net/MySqlConnector/issues/1036).
15+
16+
## OpenTelemetry
17+
18+
To export traces using OpenTelemetry, install the [OpenTelemetry NuGet package](https://www.nuget.org/packages/OpenTelemetry/) and add code similar to the following:
19+
20+
```csharp
21+
using var openTelemetry = Sdk.CreateTracerProviderBuilder()
22+
.AddSource("MySqlConnector")
23+
// add a destination, for example:
24+
// .AddZipkinExporter(o => { o.Endpoint = new Uri(...); })
25+
// .AddJaegerExporter(o => { o.AgentHost = "..."; o.AgentPort = 6831; })
26+
```

src/MySqlConnector/Core/CommandExecutor.cs

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Diagnostics;
12
using System.Net.Sockets;
23
using MySqlConnector.Logging;
34
using MySqlConnector.Protocol.Serialization;
@@ -7,62 +8,71 @@ namespace MySqlConnector.Core;
78

89
internal static class CommandExecutor
910
{
10-
public static async Task<MySqlDataReader> ExecuteReaderAsync(IReadOnlyList<IMySqlCommand> commands, ICommandPayloadCreator payloadCreator, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
11+
public static async Task<MySqlDataReader> ExecuteReaderAsync(IReadOnlyList<IMySqlCommand> commands, ICommandPayloadCreator payloadCreator, CommandBehavior behavior, Activity? activity, IOBehavior ioBehavior, CancellationToken cancellationToken)
1112
{
12-
cancellationToken.ThrowIfCancellationRequested();
13-
var commandListPosition = new CommandListPosition(commands);
14-
var command = commands[0];
13+
try
14+
{
15+
cancellationToken.ThrowIfCancellationRequested();
16+
var commandListPosition = new CommandListPosition(commands);
17+
var command = commands[0];
1518

16-
// pre-requisite: Connection is non-null must be checked before calling this method
17-
var connection = command.Connection!;
19+
// pre-requisite: Connection is non-null must be checked before calling this method
20+
var connection = command.Connection!;
1821

19-
if (Log.IsTraceEnabled())
20-
Log.Trace("Session{0} ExecuteReader {1} CommandCount: {2}", connection.Session.Id, ioBehavior, commands.Count);
22+
if (Log.IsTraceEnabled())
23+
Log.Trace("Session{0} ExecuteReader {1} CommandCount: {2}", connection.Session.Id, ioBehavior, commands.Count);
2124

22-
Dictionary<string, CachedProcedure?>? cachedProcedures = null;
23-
foreach (var command2 in commands)
24-
{
25-
if (command2.CommandType == CommandType.StoredProcedure)
25+
Dictionary<string, CachedProcedure?>? cachedProcedures = null;
26+
foreach (var command2 in commands)
2627
{
27-
cachedProcedures ??= new();
28-
var commandText = command2.CommandText!;
29-
if (!cachedProcedures.ContainsKey(commandText))
28+
if (command2.CommandType == CommandType.StoredProcedure)
3029
{
31-
cachedProcedures.Add(commandText, await connection.GetCachedProcedure(commandText, revalidateMissing: false, ioBehavior, cancellationToken).ConfigureAwait(false));
30+
cachedProcedures ??= new();
31+
var commandText = command2.CommandText!;
32+
if (!cachedProcedures.ContainsKey(commandText))
33+
{
34+
cachedProcedures.Add(commandText, await connection.GetCachedProcedure(commandText, revalidateMissing: false, ioBehavior, cancellationToken).ConfigureAwait(false));
3235

33-
// because the connection was used to execute a MySqlDataReader with the connection's DefaultCommandTimeout,
34-
// we need to reapply the command's CommandTimeout (even if some of the time has elapsed)
35-
command.CancellableCommand.ResetCommandTimeout();
36+
// because the connection was used to execute a MySqlDataReader with the connection's DefaultCommandTimeout,
37+
// we need to reapply the command's CommandTimeout (even if some of the time has elapsed)
38+
command.CancellableCommand.ResetCommandTimeout();
39+
}
3640
}
3741
}
38-
}
3942

40-
var writer = new ByteBufferWriter();
41-
// cachedProcedures will be non-null if there is a stored procedure, which is also the only time it will be read
42-
if (!payloadCreator.WriteQueryCommand(ref commandListPosition, cachedProcedures!, writer))
43-
throw new InvalidOperationException("ICommandPayloadCreator failed to write query payload");
43+
var writer = new ByteBufferWriter();
44+
// cachedProcedures will be non-null if there is a stored procedure, which is also the only time it will be read
45+
if (!payloadCreator.WriteQueryCommand(ref commandListPosition, cachedProcedures!, writer))
46+
throw new InvalidOperationException("ICommandPayloadCreator failed to write query payload");
4447

45-
cancellationToken.ThrowIfCancellationRequested();
48+
cancellationToken.ThrowIfCancellationRequested();
4649

47-
using var payload = writer.ToPayloadData();
48-
connection.Session.StartQuerying(command.CancellableCommand);
49-
command.SetLastInsertedId(-1);
50-
try
51-
{
52-
await connection.Session.SendAsync(payload, ioBehavior, CancellationToken.None).ConfigureAwait(false);
53-
return await MySqlDataReader.CreateAsync(commandListPosition, payloadCreator, cachedProcedures, command, behavior, ioBehavior, cancellationToken).ConfigureAwait(false);
54-
}
55-
catch (MySqlException ex) when (ex.ErrorCode == MySqlErrorCode.QueryInterrupted && cancellationToken.IsCancellationRequested)
56-
{
57-
Log.Info("Session{0} query was interrupted", connection.Session.Id);
58-
throw new OperationCanceledException(ex. Message, ex, cancellationToken);
50+
using var payload = writer.ToPayloadData();
51+
connection.Session.StartQuerying(command.CancellableCommand);
52+
command.SetLastInsertedId(-1);
53+
try
54+
{
55+
await connection.Session.SendAsync(payload, ioBehavior, CancellationToken.None).ConfigureAwait(false);
56+
return await MySqlDataReader.CreateAsync(commandListPosition, payloadCreator, cachedProcedures, command, behavior, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
57+
}
58+
catch (MySqlException ex) when (ex.ErrorCode == MySqlErrorCode.QueryInterrupted && cancellationToken.IsCancellationRequested)
59+
{
60+
Log.Info("Session{0} query was interrupted", connection.Session.Id);
61+
throw new OperationCanceledException(ex.Message, ex, cancellationToken);
62+
}
63+
catch (Exception ex) when (payload.Span.Length > 4_194_304 && (ex is SocketException or IOException or MySqlProtocolException))
64+
{
65+
// the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
66+
// use "decimal megabytes" (to round up) when creating the exception message
67+
int megabytes = payload.Span.Length / 1_000_000;
68+
throw new MySqlException("Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB.".FormatInvariant(megabytes), ex);
69+
}
5970
}
60-
catch (Exception ex) when (payload.Span.Length > 4_194_304 && (ex is SocketException or IOException or MySqlProtocolException))
71+
catch (Exception ex) when (activity is { IsAllDataRequested: true })
6172
{
62-
// the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
63-
// use "decimal megabytes" (to round up) when creating the exception message
64-
int megabytes = payload.Span.Length / 1_000_000;
65-
throw new MySqlException("Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB.".FormatInvariant(megabytes), ex);
73+
activity.SetException(ex);
74+
activity.Stop();
75+
throw;
6676
}
6777
}
6878

src/MySqlConnector/Core/ResultSet.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Buffers;
2+
using System.Diagnostics;
23
using System.Globalization;
34
using System.Runtime.ExceptionServices;
45
using MySqlConnector.Protocol;
@@ -41,7 +42,6 @@ public async Task ReadResultSetHeaderAsync(IOBehavior ioBehavior)
4142
while (true)
4243
{
4344
var payload = await Session.ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
44-
4545
var firstByte = payload.HeaderByte;
4646
if (firstByte == OkPayload.Signature)
4747
{
@@ -112,7 +112,7 @@ public async Task ReadResultSetHeaderAsync(IOBehavior ioBehavior)
112112
}
113113
else
114114
{
115-
int ReadColumnCount(ReadOnlySpan<byte> span)
115+
static int ReadColumnCount(ReadOnlySpan<byte> span)
116116
{
117117
var reader = new ByteArrayReader(span);
118118
var columnCount_ = (int) reader.ReadLengthEncodedInteger();
@@ -154,6 +154,8 @@ int ReadColumnCount(ReadOnlySpan<byte> span)
154154
ContainsCommandParameters = true;
155155
WarningCount = 0;
156156
State = ResultSetState.ReadResultSetHeader;
157+
if (DataReader.Activity is { IsAllDataRequested: true })
158+
DataReader.Activity.AddEvent(new ActivityEvent("read-result-set-header"));
157159
break;
158160
}
159161
}

src/MySqlConnector/Core/ServerSession.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ public ServerSession(ConnectionPool? pool, int poolGeneration, int id)
3838
PoolGeneration = poolGeneration;
3939
HostName = "";
4040
m_logArguments = new object?[] { "{0}".FormatInvariant(Id), null };
41+
m_activityTags = new ActivityTagsCollection
42+
{
43+
{ ActivitySourceHelper.DatabaseSystemTagName, ActivitySourceHelper.DatabaseSystemValue },
44+
};
4145
Log.Trace("Session{0} created new session", m_logArguments);
4246
}
4347

@@ -59,6 +63,7 @@ public ServerSession(ConnectionPool? pool, int poolGeneration, int id)
5963
public bool SupportsDeprecateEof => m_supportsDeprecateEof;
6064
public bool SupportsSessionTrack => m_supportsSessionTrack;
6165
public bool ProcAccessDenied { get; set; }
66+
public ICollection<KeyValuePair<string, object?>> ActivityTags => m_activityTags;
6267

6368
#if NETCOREAPP2_1_OR_GREATER || NETSTANDARD2_1_OR_GREATER
6469
public ValueTask ReturnToPoolAsync(IOBehavior ioBehavior, MySqlConnection? owningConnection)
@@ -330,6 +335,19 @@ public void FinishQuerying()
330335

331336
public void SetTimeout(int timeoutMilliseconds) => m_payloadHandler!.ByteHandler.RemainingTimeout = timeoutMilliseconds;
332337

338+
public Activity? StartActivity(string name, string? tagName1 = null, object? tagValue1 = null)
339+
{
340+
var activity = ActivitySourceHelper.StartActivity(name, m_activityTags);
341+
if (activity is { IsAllDataRequested: true })
342+
{
343+
if (DatabaseOverride is not null)
344+
activity.SetTag(ActivitySourceHelper.DatabaseNameTagName, DatabaseOverride);
345+
if (tagName1 is not null)
346+
activity.SetTag(tagName1, tagValue1);
347+
}
348+
return activity;
349+
}
350+
333351
public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
334352
{
335353
if (m_payloadHandler is not null)
@@ -515,6 +533,12 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
515533
await GetRealServerDetailsAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
516534

517535
m_payloadHandler.ByteHandler.RemainingTimeout = Constants.InfiniteTimeout;
536+
537+
m_activityTags.Add(ActivitySourceHelper.DatabaseConnectionIdTagName, ConnectionId.ToString(CultureInfo.InvariantCulture));
538+
m_activityTags.Add(ActivitySourceHelper.DatabaseConnectionStringTagName, cs.ConnectionStringBuilder.GetConnectionString(cs.ConnectionStringBuilder.PersistSecurityInfo));
539+
m_activityTags.Add(ActivitySourceHelper.DatabaseUserTagName, cs.UserID);
540+
if (cs.Database.Length != 0)
541+
m_activityTags.Add(ActivitySourceHelper.DatabaseNameTagName, cs.Database);
518542
}
519543
catch (ArgumentException ex)
520544
{
@@ -1032,6 +1056,14 @@ private async Task<bool> OpenTcpSocketAsync(ConnectionSettings cs, ILoadBalancer
10321056
m_socket.NoDelay = true;
10331057
m_stream = m_tcpClient.GetStream();
10341058
m_socket.SetKeepAlive(cs.Keepalive);
1059+
1060+
m_activityTags.Add(ActivitySourceHelper.NetTransportTagName, ActivitySourceHelper.NetTransportTcpIpValue);
1061+
var ipAddressString = ipAddress.ToString();
1062+
m_activityTags.Add(ActivitySourceHelper.NetPeerIpTagName, ipAddressString);
1063+
if (ipAddressString != hostName)
1064+
m_activityTags.Add(ActivitySourceHelper.NetPeerNameTagName, hostName);
1065+
if (cs.Port != 3306)
1066+
m_activityTags.Add(ActivitySourceHelper.NetPeerPortTagName, cs.Port.ToString(CultureInfo.InvariantCulture));
10351067
}
10361068
catch (ObjectDisposedException) when (cancellationToken.IsCancellationRequested)
10371069
{
@@ -1089,6 +1121,9 @@ private async Task<bool> OpenUnixSocketAsync(ConnectionSettings cs, IOBehavior i
10891121
m_socket = socket;
10901122
m_stream = new NetworkStream(socket);
10911123

1124+
m_activityTags.Add(ActivitySourceHelper.NetTransportTagName, ActivitySourceHelper.NetTransportUnixValue);
1125+
m_activityTags.Add(ActivitySourceHelper.NetPeerNameTagName, cs.UnixSocket);
1126+
10921127
lock (m_lock)
10931128
m_state = State.Connected;
10941129
return true;
@@ -1139,6 +1174,10 @@ private async Task<bool> OpenNamedPipeAsync(ConnectionSettings cs, int startTick
11391174
{
11401175
m_stream = namedPipeStream;
11411176

1177+
// see https://docs.microsoft.com/en-us/windows/win32/ipc/pipe-names for pipe name format
1178+
m_activityTags.Add(ActivitySourceHelper.NetTransportTagName, ActivitySourceHelper.NetTransportNamedPipeValue);
1179+
m_activityTags.Add(ActivitySourceHelper.NetPeerNameTagName, @"\\" + cs.HostNames![0] + @"\pipe\" + cs.PipeName);
1180+
11421181
lock (m_lock)
11431182
m_state = State.Connected;
11441183
return true;
@@ -1790,5 +1829,6 @@ protected override void OnStatementBegin(int index)
17901829
bool m_supportsSessionTrack;
17911830
CharacterSet m_characterSet;
17921831
PayloadData m_setNamesPayload;
1832+
ActivityTagsCollection m_activityTags;
17931833
Dictionary<string, PreparedStatements>? m_preparedStatements;
17941834
}

src/MySqlConnector/MySqlBatch.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ private Task<MySqlDataReader> ExecuteReaderAsync(CommandBehavior behavior, IOBeh
157157
var payloadCreator = Connection!.Session.SupportsComMulti ? BatchedCommandPayloadCreator.Instance :
158158
IsPrepared ? SingleCommandPayloadCreator.Instance :
159159
ConcatenatedCommandPayloadCreator.Instance;
160-
return CommandExecutor.ExecuteReaderAsync(BatchCommands!.Commands, payloadCreator, behavior, ioBehavior, cancellationToken);
160+
return CommandExecutor.ExecuteReaderAsync(BatchCommands!.Commands, payloadCreator, behavior, default, ioBehavior, cancellationToken);
161161
}
162162

163163
#if NET6_0_OR_GREATER

src/MySqlConnector/MySqlCommand.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Diagnostics;
12
using System.Diagnostics.CodeAnalysis;
23
using MySqlConnector.Core;
34
using MySqlConnector.Logging;
@@ -130,6 +131,7 @@ public override void Prepare()
130131
bool IMySqlCommand.AllowUserVariables => AllowUserVariables;
131132

132133
internal bool AllowUserVariables { get; set; }
134+
internal bool NoActivity { get; set; }
133135

134136
private Task PrepareAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
135137
{
@@ -315,8 +317,10 @@ internal Task<MySqlDataReader> ExecuteReaderNoResetTimeoutAsync(CommandBehavior
315317
if (!IsValid(out var exception))
316318
return Utility.TaskFromException<MySqlDataReader>(exception);
317319

320+
var activity = NoActivity ? null : Connection!.Session.StartActivity(ActivitySourceHelper.ExecuteActivityName,
321+
ActivitySourceHelper.DatabaseStatementTagName, CommandText);
318322
m_commandBehavior = behavior;
319-
return CommandExecutor.ExecuteReaderAsync(new IMySqlCommand[] { this }, SingleCommandPayloadCreator.Instance, behavior, ioBehavior, cancellationToken);
323+
return CommandExecutor.ExecuteReaderAsync(new IMySqlCommand[] { this }, SingleCommandPayloadCreator.Instance, behavior, activity, ioBehavior, cancellationToken);
320324
}
321325

322326
public MySqlCommand Clone() => new(this);

0 commit comments

Comments
 (0)