Skip to content
98 changes: 18 additions & 80 deletions src/MySqlConnector/Core/ConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ public async ValueTask<ServerSession> GetSessionAsync(MySqlConnection connection
if (ConnectionSettings.ConnectionReset || session.DatabaseOverride is not null)
{
if (timeoutMilliseconds != 0)
session.SetTimeout(Math.Max(1, timeoutMilliseconds - Utility.GetElapsedMilliseconds(startingTimestamp)));
reuseSession = await session.TryResetConnectionAsync(ConnectionSettings, connection, ioBehavior, cancellationToken).ConfigureAwait(false);
session.SetTimeout(Math.Max(1,
timeoutMilliseconds - Utility.GetElapsedMilliseconds(startingTimestamp)));
reuseSession = await session
.TryResetConnectionAsync(ConnectionSettings, connection, ioBehavior, cancellationToken)
.ConfigureAwait(false);
session.SetTimeout(Constants.InfiniteTimeout);
}
else
Expand All @@ -95,18 +98,24 @@ public async ValueTask<ServerSession> GetSessionAsync(MySqlConnection connection
m_leasedSessions.Add(session.Id, session);
leasedSessionsCountPooled = m_leasedSessions.Count;
}

MetricsReporter.AddUsed(this);
ActivitySourceHelper.CopyTags(session.ActivityTags, activity);
Log.ReturningPooledSession(m_logger, Id, session.Id, leasedSessionsCountPooled);

session.LastLeasedTimestamp = Stopwatch.GetTimestamp();
MetricsReporter.RecordWaitTime(this, Utility.GetElapsedSeconds(startingTimestamp, session.LastLeasedTimestamp));
MetricsReporter.RecordWaitTime(this,
Utility.GetElapsedSeconds(startingTimestamp, session.LastLeasedTimestamp));
return session;
}
}

// create a new session
session = await ConnectSessionAsync(connection, s_createdNewSession, startingTimestamp, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
session = await ServerSession.ConnectAndRedirectAsync(
() => new ServerSession(m_connectionLogger, this, m_generation,
Interlocked.Increment(ref m_lastSessionId)), m_logger, Id, ConnectionSettings, m_loadBalancer,
connection, s_createdNewSession, startingTimestamp, activity, ioBehavior, cancellationToken)
.ConfigureAwait(false);
AdjustHostConnectionCount(session, 1);
session.OwningConnection = new(connection);
int leasedSessionsCountNew;
Expand Down Expand Up @@ -402,7 +411,11 @@ private async Task CreateMinimumPooledSessions(MySqlConnection connection, IOBeh

try
{
var session = await ConnectSessionAsync(connection, s_createdToReachMinimumPoolSize, Stopwatch.GetTimestamp(), null, ioBehavior, cancellationToken).ConfigureAwait(false);
var session = await ServerSession.ConnectAndRedirectAsync(
() => new ServerSession(m_connectionLogger, this, m_generation,
Interlocked.Increment(ref m_lastSessionId)), m_logger, Id, ConnectionSettings, m_loadBalancer,
connection, s_createdToReachMinimumPoolSize, Stopwatch.GetTimestamp(), null, ioBehavior,
cancellationToken).ConfigureAwait(false);
AdjustHostConnectionCount(session, 1);
lock (m_sessions)
_ = m_sessions.AddFirst(session);
Expand All @@ -416,81 +429,6 @@ private async Task CreateMinimumPooledSessions(MySqlConnection connection, IOBeh
}
}

private async ValueTask<ServerSession> ConnectSessionAsync(MySqlConnection connection, Action<ILogger, int, string, Exception?> logMessage, long startingTimestamp, Activity? activity, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
var session = new ServerSession(m_connectionLogger, this, m_generation, Interlocked.Increment(ref m_lastSessionId));
if (m_logger.IsEnabled(LogLevel.Debug))
logMessage(m_logger, Id, session.Id, null);
string? statusInfo;
try
{
statusInfo = await session.ConnectAsync(ConnectionSettings, connection, startingTimestamp, m_loadBalancer, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
await session.DisposeAsync(ioBehavior, default).ConfigureAwait(false);
throw;
}

Exception? redirectionException = null;
if (statusInfo is not null && statusInfo.StartsWith("Location: mysql://", StringComparison.Ordinal))
{
// server redirection string has the format "Location: mysql://{host}:{port}/user={userId}[&ttl={ttl}]"
Log.HasServerRedirectionHeader(m_logger, session.Id, statusInfo);

if (ConnectionSettings.ServerRedirectionMode == MySqlServerRedirectionMode.Disabled)
{
Log.ServerRedirectionIsDisabled(m_logger, Id);
}
else if (Utility.TryParseRedirectionHeader(statusInfo, out var host, out var port, out var user))
{
if (host != ConnectionSettings.HostNames![0] || port != ConnectionSettings.Port || user != ConnectionSettings.UserID)
{
var redirectedSettings = ConnectionSettings.CloneWith(host, port, user);
Log.OpeningNewConnection(m_logger, Id, host, port, user);
var redirectedSession = new ServerSession(m_connectionLogger, this, m_generation, Interlocked.Increment(ref m_lastSessionId));
try
{
_ = await redirectedSession.ConnectAsync(redirectedSettings, connection, startingTimestamp, m_loadBalancer, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
Log.FailedToConnectRedirectedSession(m_logger, ex, Id, redirectedSession.Id);
redirectionException = ex;
}

if (redirectionException is null)
{
Log.ClosingSessionToUseRedirectedSession(m_logger, Id, session.Id, redirectedSession.Id);
await session.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
return redirectedSession;
}
else
{
try
{
await redirectedSession.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
}
}
}
else
{
Log.SessionAlreadyConnectedToServer(m_logger, session.Id);
}
}
}

if (ConnectionSettings.ServerRedirectionMode == MySqlServerRedirectionMode.Required)
{
Log.RequiresServerRedirection(m_logger, Id);
throw new MySqlException(MySqlErrorCode.UnableToConnectToHost, "Server does not support redirection", redirectionException);
}
return session;
}

public static ConnectionPool? CreatePool(string connectionString, MySqlConnectorLoggingConfiguration loggingConfiguration, string? name)
{
// parse connection string and check for 'Pooling' setting; return 'null' if pooling is disabled
Expand Down
8 changes: 6 additions & 2 deletions src/MySqlConnector/Core/ConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,12 @@ public int ConnectionTimeoutMilliseconds

private ConnectionSettings(ConnectionSettings other, string host, int port, string userId)
{
ConnectionStringBuilder = other.ConnectionStringBuilder;
ConnectionString = other.ConnectionString;
ConnectionStringBuilder = new MySqlConnectionStringBuilder(other.ConnectionString);
ConnectionStringBuilder.Port = (uint)port;
ConnectionStringBuilder.Server = host;
ConnectionStringBuilder.UserID = userId;

ConnectionString = ConnectionStringBuilder.ConnectionString;

ConnectionProtocol = MySqlConnectionProtocol.Sockets;
HostNames = [host];
Expand Down
89 changes: 83 additions & 6 deletions src/MySqlConnector/Core/ServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public ServerSession(ILogger logger, ConnectionPool? pool, int poolGeneration, i
public bool SupportsPerQueryVariables => ServerVersion.IsMariaDb && ServerVersion.Version >= ServerVersions.MariaDbSupportsPerQueryVariables;
public int ActiveCommandId { get; private set; }
public int CancellationTimeout { get; private set; }
public string? ConnectionString { get; private set; }
public int ConnectionId { get; set; }
public byte[]? AuthPluginData { get; set; }
public long CreatedTimestamp { get; }
Expand Down Expand Up @@ -391,7 +392,7 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
m_state = State.Closed;
}

public async Task<string?> ConnectAsync(ConnectionSettings cs, MySqlConnection connection, long startingTimestamp, ILoadBalancer? loadBalancer, Activity? activity, IOBehavior ioBehavior, CancellationToken cancellationToken)
private async Task<string?> ConnectAsync(ConnectionSettings cs, MySqlConnection connection, long startingTimestamp, ILoadBalancer? loadBalancer, Activity? activity, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
try
{
Expand All @@ -403,16 +404,16 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella

// set activity tags
{
var connectionString = cs.ConnectionStringBuilder.GetConnectionString(cs.ConnectionStringBuilder.PersistSecurityInfo);
ConnectionString = cs.ConnectionStringBuilder.GetConnectionString(cs.ConnectionStringBuilder.PersistSecurityInfo);
m_activityTags.Add(ActivitySourceHelper.DatabaseSystemTagName, ActivitySourceHelper.DatabaseSystemValue);
m_activityTags.Add(ActivitySourceHelper.DatabaseConnectionStringTagName, connectionString);
m_activityTags.Add(ActivitySourceHelper.DatabaseConnectionStringTagName, ConnectionString);
m_activityTags.Add(ActivitySourceHelper.DatabaseUserTagName, cs.UserID);
if (cs.Database.Length != 0)
m_activityTags.Add(ActivitySourceHelper.DatabaseNameTagName, cs.Database);
if (activity is { IsAllDataRequested: true })
{
activity.SetTag(ActivitySourceHelper.DatabaseSystemTagName, ActivitySourceHelper.DatabaseSystemValue)
.SetTag(ActivitySourceHelper.DatabaseConnectionStringTagName, connectionString)
.SetTag(ActivitySourceHelper.DatabaseConnectionStringTagName, ConnectionString)
.SetTag(ActivitySourceHelper.DatabaseUserTagName, cs.UserID);
if (cs.Database.Length != 0)
activity.SetTag(ActivitySourceHelper.DatabaseNameTagName, cs.Database);
Expand Down Expand Up @@ -533,7 +534,7 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
}

var ok = OkPayload.Create(payload.Span, this);
var statusInfo = ok.StatusInfo;
var redirectionUrl = ok.RedirectionUrl;

if (m_useCompression)
m_payloadHandler = new CompressedPayloadHandler(m_payloadHandler.ByteHandler);
Expand All @@ -558,7 +559,7 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
}

m_payloadHandler.ByteHandler.RemainingTimeout = Constants.InfiniteTimeout;
return statusInfo;
return redirectionUrl;
}
catch (ArgumentException ex)
{
Expand All @@ -572,6 +573,82 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
}
}

public static async ValueTask<ServerSession> ConnectAndRedirectAsync(Func<ServerSession> createSession, ILogger logger, int? poolId, ConnectionSettings cs, ILoadBalancer? loadBalancer, MySqlConnection connection, Action<ILogger, int, string, Exception?>? logMessage, long startingTimestamp, Activity? activity, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
var session = createSession();
if (poolId is not null)
{
if (logger.IsEnabled(LogLevel.Debug)) logMessage!(logger, poolId.Value, session.Id, null);
}
else
{
Log.CreatedNonPooledSession(logger, session.Id);
}

string? redirectionUrl;
try
{
redirectionUrl = await session.ConnectAsync(cs, connection, startingTimestamp, loadBalancer, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
await session.DisposeAsync(ioBehavior, default).ConfigureAwait(false);
throw;
}

Exception? redirectionException = null;
var poolPrefix = poolId is not null ? "Pool {PoolId} " : "";
if (redirectionUrl is not null)
{
Log.HasServerRedirectionHeader(logger, poolPrefix, session.Id, redirectionUrl!);
if (cs.ServerRedirectionMode == MySqlServerRedirectionMode.Disabled)
{
Log.ServerRedirectionIsDisabled(logger, poolPrefix);
return session;
}

if (Utility.TryParseRedirectionHeader(redirectionUrl, cs.UserID, out var host, out var port, out var user))
{
if (host != cs.HostNames![0] || port != cs.Port || user != cs.UserID)
{
var redirectedSettings = cs.CloneWith(host, port, user);
Log.OpeningNewConnection(logger, poolPrefix, host, port, user);
var redirectedSession = createSession();
try
{
await redirectedSession.ConnectAsync(redirectedSettings, connection, startingTimestamp, loadBalancer, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
Log.ClosingSessionToUseRedirectedSession(logger, poolPrefix, session.Id, redirectedSession.Id);
await session.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
return redirectedSession;
}
catch (Exception ex)
{
redirectionException = ex;
Log.FailedToConnectRedirectedSession(logger, ex, poolPrefix, redirectedSession.Id);
try
{
await redirectedSession.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
}
}
}
else
{
Log.SessionAlreadyConnectedToServer(logger, poolPrefix, session.Id);
}
}
}

if (cs.ServerRedirectionMode == MySqlServerRedirectionMode.Required)
{
Log.RequiresServerRedirection(logger, poolPrefix);
throw new MySqlException(MySqlErrorCode.UnableToConnectToHost, "Server does not support redirection", redirectionException);
}
return session;
}

public async Task<bool> TryResetConnectionAsync(ConnectionSettings cs, MySqlConnection connection, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
VerifyState(State.Connected);
Expand Down
28 changes: 14 additions & 14 deletions src/MySqlConnector/Logging/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -402,26 +402,26 @@ internal static partial class Log
[LoggerMessage(EventIds.FoundSessionToCleanUp, LogLevel.Debug, "Pool {PoolId} found session {SessionId} to clean up")]
public static partial void FoundSessionToCleanUp(ILogger logger, int poolId, string sessionId);

[LoggerMessage(EventIds.HasServerRedirectionHeader, LogLevel.Trace, "Session {SessionId} has server redirection header {Header}")]
public static partial void HasServerRedirectionHeader(ILogger logger, string sessionId, string header);
[LoggerMessage(EventIds.HasServerRedirectionHeader, LogLevel.Trace, "{poolPrefix}Session {SessionId} has server redirection header {Header}")]
public static partial void HasServerRedirectionHeader(ILogger logger, string poolPrefix, string sessionId, string header);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log field names are significant, and are used for "structured logging" within Microsoft.Extensions.Logging; see https://learn.microsoft.com/en-us/aspnet/core/fundamentals/logging/?view=aspnetcore-8.0#log-message-template.

We can't just change them for better code reuse (thinking of them like string concatenation).


[LoggerMessage(EventIds.ServerRedirectionIsDisabled, LogLevel.Trace, "Pool {PoolId} server redirection is disabled; ignoring redirection")]
public static partial void ServerRedirectionIsDisabled(ILogger logger, int poolId);
[LoggerMessage(EventIds.ServerRedirectionIsDisabled, LogLevel.Trace, "{poolPrefix}server redirection is disabled; ignoring redirection")]
public static partial void ServerRedirectionIsDisabled(ILogger logger, string poolPrefix);

[LoggerMessage(EventIds.OpeningNewConnection, LogLevel.Debug, "Pool {PoolId} opening new connection to {Host}:{Port} as {User}")]
public static partial void OpeningNewConnection(ILogger logger, int poolId, string host, int port, string user);
[LoggerMessage(EventIds.OpeningNewConnection, LogLevel.Debug, "{poolPrefix}opening new connection to {Host}:{Port} as {User}")]
public static partial void OpeningNewConnection(ILogger logger, string poolPrefix, string host, int port, string user);

[LoggerMessage(EventIds.FailedToConnectRedirectedSession, LogLevel.Information, "Pool {PoolId} failed to connect redirected session {SessionId}")]
public static partial void FailedToConnectRedirectedSession(ILogger logger, Exception ex, int poolId, string sessionId);
[LoggerMessage(EventIds.FailedToConnectRedirectedSession, LogLevel.Information, "{poolPrefix}failed to connect redirected session {SessionId}")]
public static partial void FailedToConnectRedirectedSession(ILogger logger, Exception ex, string poolPrefix, string sessionId);

[LoggerMessage(EventIds.ClosingSessionToUseRedirectedSession, LogLevel.Trace, "Pool {PoolId} closing session {SessionId} to use redirected session {RedirectedSessionId} instead")]
public static partial void ClosingSessionToUseRedirectedSession(ILogger logger, int poolId, string sessionId, string redirectedSessionId);
[LoggerMessage(EventIds.ClosingSessionToUseRedirectedSession, LogLevel.Trace, "{poolPrefix}closing session {SessionId} to use redirected session {RedirectedSessionId} instead")]
public static partial void ClosingSessionToUseRedirectedSession(ILogger logger, string poolPrefix, string sessionId, string redirectedSessionId);

[LoggerMessage(EventIds.SessionAlreadyConnectedToServer, LogLevel.Trace, "Session {SessionId} is already connected to this server; ignoring redirection")]
public static partial void SessionAlreadyConnectedToServer(ILogger logger, string sessionId);
[LoggerMessage(EventIds.SessionAlreadyConnectedToServer, LogLevel.Trace, "{poolPrefix}Session {SessionId} is already connected to this server; ignoring redirection")]
public static partial void SessionAlreadyConnectedToServer(ILogger logger, string poolPrefix, string sessionId);

[LoggerMessage(EventIds.RequiresServerRedirection, LogLevel.Error, "Pool {PoolId} requires server redirection but server doesn't support it")]
public static partial void RequiresServerRedirection(ILogger logger, int poolId);
[LoggerMessage(EventIds.RequiresServerRedirection, LogLevel.Error, "{poolPrefix}new connection requires server redirection but server doesn't support it")]
public static partial void RequiresServerRedirection(ILogger logger, string poolPrefix);

[LoggerMessage(EventIds.CreatedPoolWillNotBeUsed, LogLevel.Debug, "Pool {PoolId} was created but will not be used (due to race)")]
public static partial void CreatedPoolWillNotBeUsed(ILogger logger, int poolId);
Expand Down
21 changes: 5 additions & 16 deletions src/MySqlConnector/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ public override string ConnectionString
}
}

public string? SessionConnectionString => m_session?.ConnectionString;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just for tests? We shouldn't add a new public property just to make testing easier; that will imply a commitment to support this.


public override string Database => m_session?.DatabaseOverride ?? GetConnectionSettings().Database;

public override ConnectionState State => m_connectionState;
Expand Down Expand Up @@ -1062,22 +1064,9 @@ private async ValueTask<ServerSession> CreateSessionAsync(ConnectionPool? pool,
// only "fail over" and "random" load balancers supported without connection pooling
var loadBalancer = connectionSettings.LoadBalance == MySqlLoadBalance.Random && connectionSettings.HostNames!.Count > 1 ?
RandomLoadBalancer.Instance : FailOverLoadBalancer.Instance;

var session = new ServerSession(m_logger)
{
OwningConnection = new WeakReference<MySqlConnection>(this),
};
Log.CreatedNonPooledSession(m_logger, session.Id);
try
{
_ = await session.ConnectAsync(connectionSettings, this, startingTimestamp, loadBalancer, activity, actualIOBehavior, connectToken).ConfigureAwait(false);
return session;
}
catch (Exception)
{
await session.DisposeAsync(actualIOBehavior, default).ConfigureAwait(false);
throw;
}
var session = await ServerSession.ConnectAndRedirectAsync(() => new ServerSession(m_logger), m_logger, null, connectionSettings, loadBalancer, this, null, startingTimestamp, null, actualIOBehavior, cancellationToken).ConfigureAwait(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var session = await ServerSession.ConnectAndRedirectAsync(() => new ServerSession(m_logger), m_logger, null, connectionSettings, loadBalancer, this, null, startingTimestamp, null, actualIOBehavior, cancellationToken).ConfigureAwait(false);
var session = await ServerSession.ConnectAndRedirectAsync(() => new ServerSession(m_logger), m_logger, null, connectionSettings, loadBalancer, this, null, startingTimestamp, null, actualIOBehavior, connectToken).ConfigureAwait(false);

See line 1073 below.

session.OwningConnection = new WeakReference<MySqlConnection>(this);
return session;
}
}
catch (OperationCanceledException) when (timeoutSource?.IsCancellationRequested is true)
Expand Down
Loading