Skip to content
12 changes: 6 additions & 6 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,27 +187,27 @@ jobs:
'MySQL 8.0':
image: 'mysql:8.0'
connectionStringExtra: 'AllowPublicKeyRetrieval=True'
unsupportedFeatures: 'Ed25519,StreamingResults,Tls11,ZeroDateTime'
unsupportedFeatures: 'Ed25519,StreamingResults,Tls11,ZeroDateTime,Redirection'
'MySQL 8.4':
image: 'mysql:8.4'
connectionStringExtra: 'AllowPublicKeyRetrieval=True'
unsupportedFeatures: 'Ed25519,StreamingResults,Tls11,ZeroDateTime'
unsupportedFeatures: 'Ed25519,StreamingResults,Tls11,ZeroDateTime,Redirection'
'MySQL 9.0':
image: 'mysql:9.0'
connectionStringExtra: 'AllowPublicKeyRetrieval=True'
unsupportedFeatures: 'Ed25519,StreamingResults,Tls11,ZeroDateTime'
unsupportedFeatures: 'Ed25519,StreamingResults,Tls11,ZeroDateTime,Redirection'
'MariaDB 10.6':
image: 'mariadb:10.6'
connectionStringExtra: ''
unsupportedFeatures: 'CachingSha2Password,CancelSleepSuccessfully,Json,RoundDateTime,QueryAttributes,Sha256Password,Tls11,UuidToBin'
unsupportedFeatures: 'CachingSha2Password,CancelSleepSuccessfully,Json,RoundDateTime,QueryAttributes,Sha256Password,Tls11,UuidToBin,Redirection'
'MariaDB 10.11':
image: 'mariadb:10.11'
connectionStringExtra: ''
unsupportedFeatures: 'CachingSha2Password,CancelSleepSuccessfully,Json,RoundDateTime,QueryAttributes,Sha256Password,Tls11,UuidToBin'
unsupportedFeatures: 'CachingSha2Password,CancelSleepSuccessfully,Json,RoundDateTime,QueryAttributes,Sha256Password,Tls11,UuidToBin,Redirection'
'MariaDB 11.4':
image: 'mariadb:11.4'
connectionStringExtra: ''
unsupportedFeatures: 'CachingSha2Password,CancelSleepSuccessfully,Json,RoundDateTime,QueryAttributes,Sha256Password,Tls11,UuidToBin'
unsupportedFeatures: 'CachingSha2Password,CancelSleepSuccessfully,Json,RoundDateTime,QueryAttributes,Sha256Password,Tls11,UuidToBin,Redirection'
steps:
- template: '.ci/integration-tests-steps.yml'
parameters:
Expand Down
98 changes: 18 additions & 80 deletions src/MySqlConnector/Core/ConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ public async ValueTask<ServerSession> GetSessionAsync(MySqlConnection connection
if (ConnectionSettings.ConnectionReset || session.DatabaseOverride is not null)
{
if (timeoutMilliseconds != 0)
session.SetTimeout(Math.Max(1, timeoutMilliseconds - Utility.GetElapsedMilliseconds(startingTimestamp)));
reuseSession = await session.TryResetConnectionAsync(ConnectionSettings, connection, ioBehavior, cancellationToken).ConfigureAwait(false);
session.SetTimeout(Math.Max(1,
timeoutMilliseconds - Utility.GetElapsedMilliseconds(startingTimestamp)));
reuseSession = await session
.TryResetConnectionAsync(ConnectionSettings, connection, ioBehavior, cancellationToken)
.ConfigureAwait(false);
session.SetTimeout(Constants.InfiniteTimeout);
}
else
Expand All @@ -95,18 +98,24 @@ public async ValueTask<ServerSession> GetSessionAsync(MySqlConnection connection
m_leasedSessions.Add(session.Id, session);
leasedSessionsCountPooled = m_leasedSessions.Count;
}

MetricsReporter.AddUsed(this);
ActivitySourceHelper.CopyTags(session.ActivityTags, activity);
Log.ReturningPooledSession(m_logger, Id, session.Id, leasedSessionsCountPooled);

session.LastLeasedTimestamp = Stopwatch.GetTimestamp();
MetricsReporter.RecordWaitTime(this, Utility.GetElapsedSeconds(startingTimestamp, session.LastLeasedTimestamp));
MetricsReporter.RecordWaitTime(this,
Utility.GetElapsedSeconds(startingTimestamp, session.LastLeasedTimestamp));
return session;
}
}

// create a new session
session = await ConnectSessionAsync(connection, s_createdNewSession, startingTimestamp, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
session = await ServerSession.ConnectAndRedirectAsync(
() => new ServerSession(m_connectionLogger, this, m_generation,
Interlocked.Increment(ref m_lastSessionId)), m_logger, Id, ConnectionSettings, m_loadBalancer,
connection, s_createdNewSession, startingTimestamp, activity, ioBehavior, cancellationToken)
.ConfigureAwait(false);
AdjustHostConnectionCount(session, 1);
session.OwningConnection = new(connection);
int leasedSessionsCountNew;
Expand Down Expand Up @@ -402,7 +411,11 @@ private async Task CreateMinimumPooledSessions(MySqlConnection connection, IOBeh

try
{
var session = await ConnectSessionAsync(connection, s_createdToReachMinimumPoolSize, Stopwatch.GetTimestamp(), null, ioBehavior, cancellationToken).ConfigureAwait(false);
var session = await ServerSession.ConnectAndRedirectAsync(
() => new ServerSession(m_connectionLogger, this, m_generation,
Interlocked.Increment(ref m_lastSessionId)), m_logger, Id, ConnectionSettings, m_loadBalancer,
connection, s_createdToReachMinimumPoolSize, Stopwatch.GetTimestamp(), null, ioBehavior,
cancellationToken).ConfigureAwait(false);
AdjustHostConnectionCount(session, 1);
lock (m_sessions)
_ = m_sessions.AddFirst(session);
Expand All @@ -416,81 +429,6 @@ private async Task CreateMinimumPooledSessions(MySqlConnection connection, IOBeh
}
}

private async ValueTask<ServerSession> ConnectSessionAsync(MySqlConnection connection, Action<ILogger, int, string, Exception?> logMessage, long startingTimestamp, Activity? activity, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
var session = new ServerSession(m_connectionLogger, this, m_generation, Interlocked.Increment(ref m_lastSessionId));
if (m_logger.IsEnabled(LogLevel.Debug))
logMessage(m_logger, Id, session.Id, null);
string? statusInfo;
try
{
statusInfo = await session.ConnectAsync(ConnectionSettings, connection, startingTimestamp, m_loadBalancer, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
await session.DisposeAsync(ioBehavior, default).ConfigureAwait(false);
throw;
}

Exception? redirectionException = null;
if (statusInfo is not null && statusInfo.StartsWith("Location: mysql://", StringComparison.Ordinal))
{
// server redirection string has the format "Location: mysql://{host}:{port}/user={userId}[&ttl={ttl}]"
Log.HasServerRedirectionHeader(m_logger, session.Id, statusInfo);

if (ConnectionSettings.ServerRedirectionMode == MySqlServerRedirectionMode.Disabled)
{
Log.ServerRedirectionIsDisabled(m_logger, Id);
}
else if (Utility.TryParseRedirectionHeader(statusInfo, out var host, out var port, out var user))
{
if (host != ConnectionSettings.HostNames![0] || port != ConnectionSettings.Port || user != ConnectionSettings.UserID)
{
var redirectedSettings = ConnectionSettings.CloneWith(host, port, user);
Log.OpeningNewConnection(m_logger, Id, host, port, user);
var redirectedSession = new ServerSession(m_connectionLogger, this, m_generation, Interlocked.Increment(ref m_lastSessionId));
try
{
_ = await redirectedSession.ConnectAsync(redirectedSettings, connection, startingTimestamp, m_loadBalancer, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
Log.FailedToConnectRedirectedSession(m_logger, ex, Id, redirectedSession.Id);
redirectionException = ex;
}

if (redirectionException is null)
{
Log.ClosingSessionToUseRedirectedSession(m_logger, Id, session.Id, redirectedSession.Id);
await session.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
return redirectedSession;
}
else
{
try
{
await redirectedSession.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
}
}
}
else
{
Log.SessionAlreadyConnectedToServer(m_logger, session.Id);
}
}
}

if (ConnectionSettings.ServerRedirectionMode == MySqlServerRedirectionMode.Required)
{
Log.RequiresServerRedirection(m_logger, Id);
throw new MySqlException(MySqlErrorCode.UnableToConnectToHost, "Server does not support redirection", redirectionException);
}
return session;
}

public static ConnectionPool? CreatePool(string connectionString, MySqlConnectorLoggingConfiguration loggingConfiguration, string? name)
{
// parse connection string and check for 'Pooling' setting; return 'null' if pooling is disabled
Expand Down
8 changes: 6 additions & 2 deletions src/MySqlConnector/Core/ConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,12 @@ public int ConnectionTimeoutMilliseconds

private ConnectionSettings(ConnectionSettings other, string host, int port, string userId)
{
ConnectionStringBuilder = other.ConnectionStringBuilder;
ConnectionString = other.ConnectionString;
ConnectionStringBuilder = new MySqlConnectionStringBuilder(other.ConnectionString);
ConnectionStringBuilder.Port = (uint)port;
ConnectionStringBuilder.Server = host;
ConnectionStringBuilder.UserID = userId;

ConnectionString = ConnectionStringBuilder.ConnectionString;

ConnectionProtocol = MySqlConnectionProtocol.Sockets;
HostNames = [host];
Expand Down
81 changes: 75 additions & 6 deletions src/MySqlConnector/Core/ServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public ServerSession(ILogger logger, ConnectionPool? pool, int poolGeneration, i
public bool SupportsPerQueryVariables => ServerVersion.IsMariaDb && ServerVersion.Version >= ServerVersions.MariaDbSupportsPerQueryVariables;
public int ActiveCommandId { get; private set; }
public int CancellationTimeout { get; private set; }
public string? ConnectionString { get; private set; }
public int ConnectionId { get; set; }
public byte[]? AuthPluginData { get; set; }
public long CreatedTimestamp { get; }
Expand Down Expand Up @@ -391,7 +392,7 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
m_state = State.Closed;
}

public async Task<string?> ConnectAsync(ConnectionSettings cs, MySqlConnection connection, long startingTimestamp, ILoadBalancer? loadBalancer, Activity? activity, IOBehavior ioBehavior, CancellationToken cancellationToken)
private async Task<string?> ConnectAsync(ConnectionSettings cs, MySqlConnection connection, long startingTimestamp, ILoadBalancer? loadBalancer, Activity? activity, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
try
{
Expand All @@ -403,16 +404,16 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella

// set activity tags
{
var connectionString = cs.ConnectionStringBuilder.GetConnectionString(cs.ConnectionStringBuilder.PersistSecurityInfo);
ConnectionString = cs.ConnectionStringBuilder.GetConnectionString(cs.ConnectionStringBuilder.PersistSecurityInfo);
m_activityTags.Add(ActivitySourceHelper.DatabaseSystemTagName, ActivitySourceHelper.DatabaseSystemValue);
m_activityTags.Add(ActivitySourceHelper.DatabaseConnectionStringTagName, connectionString);
m_activityTags.Add(ActivitySourceHelper.DatabaseConnectionStringTagName, ConnectionString);
m_activityTags.Add(ActivitySourceHelper.DatabaseUserTagName, cs.UserID);
if (cs.Database.Length != 0)
m_activityTags.Add(ActivitySourceHelper.DatabaseNameTagName, cs.Database);
if (activity is { IsAllDataRequested: true })
{
activity.SetTag(ActivitySourceHelper.DatabaseSystemTagName, ActivitySourceHelper.DatabaseSystemValue)
.SetTag(ActivitySourceHelper.DatabaseConnectionStringTagName, connectionString)
.SetTag(ActivitySourceHelper.DatabaseConnectionStringTagName, ConnectionString)
.SetTag(ActivitySourceHelper.DatabaseUserTagName, cs.UserID);
if (cs.Database.Length != 0)
activity.SetTag(ActivitySourceHelper.DatabaseNameTagName, cs.Database);
Expand Down Expand Up @@ -533,7 +534,7 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
}

var ok = OkPayload.Create(payload.Span, this);
var statusInfo = ok.StatusInfo;
var redirectionUrl = ok.RedirectionUrl;

if (m_useCompression)
m_payloadHandler = new CompressedPayloadHandler(m_payloadHandler.ByteHandler);
Expand All @@ -558,7 +559,7 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
}

m_payloadHandler.ByteHandler.RemainingTimeout = Constants.InfiniteTimeout;
return statusInfo;
return redirectionUrl;
}
catch (ArgumentException ex)
{
Expand All @@ -572,6 +573,74 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
}
}

public static async ValueTask<ServerSession> ConnectAndRedirectAsync(Func<ServerSession> createSession, ILogger logger, int? poolId, ConnectionSettings cs, ILoadBalancer? loadBalancer, MySqlConnection connection, Action<ILogger, int, string, Exception?>? logMessage, long startingTimestamp, Activity? activity, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
var session = createSession();
if (poolId is not null && logger.IsEnabled(LogLevel.Debug)) logMessage!(logger, poolId.Value, session.Id, null);

string? redirectionUrl;
try
{
redirectionUrl = await session.ConnectAsync(cs, connection, startingTimestamp, loadBalancer, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
await session.DisposeAsync(ioBehavior, default).ConfigureAwait(false);
throw;
}

Exception? redirectionException = null;
if (redirectionUrl is not null)
{
Log.HasServerRedirectionHeader(logger, session.Id, redirectionUrl);
if (cs.ServerRedirectionMode == MySqlServerRedirectionMode.Disabled)
{
Log.ServerRedirectionIsDisabled(logger, session.Id);
return session;
}

if (Utility.TryParseRedirectionHeader(redirectionUrl, cs.UserID, out var host, out var port, out var user))
{
if (host != cs.HostNames![0] || port != cs.Port || user != cs.UserID)
{
var redirectedSettings = cs.CloneWith(host, port, user);
Log.OpeningNewConnection(logger, host, port, user);
var redirectedSession = createSession();
try
{
await redirectedSession.ConnectAsync(redirectedSettings, connection, startingTimestamp, loadBalancer, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
Log.ClosingSessionToUseRedirectedSession(logger, session.Id, redirectedSession.Id);
await session.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
return redirectedSession;
}
catch (Exception ex)
{
redirectionException = ex;
Log.FailedToConnectRedirectedSession(logger, ex, redirectedSession.Id);
try
{
await redirectedSession.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
}
}
}
else
{
Log.SessionAlreadyConnectedToServer(logger, session.Id);
}
}
}

if (cs.ServerRedirectionMode == MySqlServerRedirectionMode.Required)
{
Log.RequiresServerRedirection(logger, session.Id);
throw new MySqlException(MySqlErrorCode.UnableToConnectToHost, "Server does not support redirection", redirectionException);
}
return session;
}

public async Task<bool> TryResetConnectionAsync(ConnectionSettings cs, MySqlConnection connection, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
VerifyState(State.Connected);
Expand Down
20 changes: 10 additions & 10 deletions src/MySqlConnector/Logging/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -405,23 +405,23 @@ internal static partial class Log
[LoggerMessage(EventIds.HasServerRedirectionHeader, LogLevel.Trace, "Session {SessionId} has server redirection header {Header}")]
public static partial void HasServerRedirectionHeader(ILogger logger, string sessionId, string header);

[LoggerMessage(EventIds.ServerRedirectionIsDisabled, LogLevel.Trace, "Pool {PoolId} server redirection is disabled; ignoring redirection")]
public static partial void ServerRedirectionIsDisabled(ILogger logger, int poolId);
[LoggerMessage(EventIds.ServerRedirectionIsDisabled, LogLevel.Trace, "Session {SessionId}, server redirection is disabled; ignoring redirection")]
public static partial void ServerRedirectionIsDisabled(ILogger logger, string sessionId);

[LoggerMessage(EventIds.OpeningNewConnection, LogLevel.Debug, "Pool {PoolId} opening new connection to {Host}:{Port} as {User}")]
public static partial void OpeningNewConnection(ILogger logger, int poolId, string host, int port, string user);
[LoggerMessage(EventIds.OpeningNewConnection, LogLevel.Debug, "opening new connection to {Host}:{Port} as {User}")]
public static partial void OpeningNewConnection(ILogger logger, string host, int port, string user);

[LoggerMessage(EventIds.FailedToConnectRedirectedSession, LogLevel.Information, "Pool {PoolId} failed to connect redirected session {SessionId}")]
public static partial void FailedToConnectRedirectedSession(ILogger logger, Exception ex, int poolId, string sessionId);
[LoggerMessage(EventIds.FailedToConnectRedirectedSession, LogLevel.Information, "failed to connect redirected session {SessionId}")]
public static partial void FailedToConnectRedirectedSession(ILogger logger, Exception ex, string sessionId);

[LoggerMessage(EventIds.ClosingSessionToUseRedirectedSession, LogLevel.Trace, "Pool {PoolId} closing session {SessionId} to use redirected session {RedirectedSessionId} instead")]
public static partial void ClosingSessionToUseRedirectedSession(ILogger logger, int poolId, string sessionId, string redirectedSessionId);
[LoggerMessage(EventIds.ClosingSessionToUseRedirectedSession, LogLevel.Trace, "closing session {SessionId} to use redirected session {RedirectedSessionId} instead")]
public static partial void ClosingSessionToUseRedirectedSession(ILogger logger, string sessionId, string redirectedSessionId);

[LoggerMessage(EventIds.SessionAlreadyConnectedToServer, LogLevel.Trace, "Session {SessionId} is already connected to this server; ignoring redirection")]
public static partial void SessionAlreadyConnectedToServer(ILogger logger, string sessionId);

[LoggerMessage(EventIds.RequiresServerRedirection, LogLevel.Error, "Pool {PoolId} requires server redirection but server doesn't support it")]
public static partial void RequiresServerRedirection(ILogger logger, int poolId);
[LoggerMessage(EventIds.RequiresServerRedirection, LogLevel.Error, "Session {SessionId}, new connection requires server redirection but server doesn't support it")]
public static partial void RequiresServerRedirection(ILogger logger, string SessionId);

[LoggerMessage(EventIds.CreatedPoolWillNotBeUsed, LogLevel.Debug, "Pool {PoolId} was created but will not be used (due to race)")]
public static partial void CreatedPoolWillNotBeUsed(ILogger logger, int poolId);
Expand Down
20 changes: 5 additions & 15 deletions src/MySqlConnector/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ public override string ConnectionString
}
}

public string? SessionConnectionString => m_session?.ConnectionString;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just for tests? We shouldn't add a new public property just to make testing easier; that will imply a commitment to support this.


public override string Database => m_session?.DatabaseOverride ?? GetConnectionSettings().Database;

public override ConnectionState State => m_connectionState;
Expand Down Expand Up @@ -1062,22 +1064,10 @@ private async ValueTask<ServerSession> CreateSessionAsync(ConnectionPool? pool,
// only "fail over" and "random" load balancers supported without connection pooling
var loadBalancer = connectionSettings.LoadBalance == MySqlLoadBalance.Random && connectionSettings.HostNames!.Count > 1 ?
RandomLoadBalancer.Instance : FailOverLoadBalancer.Instance;

var session = new ServerSession(m_logger)
{
OwningConnection = new WeakReference<MySqlConnection>(this),
};
var session = await ServerSession.ConnectAndRedirectAsync(() => new ServerSession(m_logger), m_logger, null, connectionSettings, loadBalancer, this, null, startingTimestamp, null, actualIOBehavior, connectToken).ConfigureAwait(false);
session.OwningConnection = new WeakReference<MySqlConnection>(this);
Log.CreatedNonPooledSession(m_logger, session.Id);
try
{
_ = await session.ConnectAsync(connectionSettings, this, startingTimestamp, loadBalancer, activity, actualIOBehavior, connectToken).ConfigureAwait(false);
return session;
}
catch (Exception)
{
await session.DisposeAsync(actualIOBehavior, default).ConfigureAwait(false);
throw;
}
return session;
}
}
catch (OperationCanceledException) when (timeoutSource?.IsCancellationRequested is true)
Expand Down
Loading