Skip to content

Commit ce5fabc

Browse files
committed
Don't serialize returning connections to pool in background.
This was a bottleneck in highly concurrent applications that could generate unacceptably high latencies when waiting for a connection to be reset and returned to the pool. Signed-off-by: Bradley Grainger <[email protected]>
1 parent 88e3c6c commit ce5fabc

File tree

3 files changed

+41
-53
lines changed

3 files changed

+41
-53
lines changed

src/MySqlConnector/Core/BackgroundConnectionResetHelper.cs

Lines changed: 12 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ internal static class BackgroundConnectionResetHelper
1111
{
1212
public static void AddSession(ServerSession session, MySqlConnection? owningConnection)
1313
{
14-
var resetTask = session.TryResetConnectionAsync(session.Pool!.ConnectionSettings, IOBehavior.Asynchronous, default);
14+
var resetTask = session.TryResetConnectionAsync(session.Pool!.ConnectionSettings, owningConnection, true, IOBehavior.Asynchronous, default);
1515
lock (s_lock)
16-
s_sessions.Add(new SessionResetTask(session, resetTask, owningConnection));
16+
s_resetTasks.Add(resetTask);
1717

1818
if (Log.IsDebugEnabled())
19-
Log.Debug("Started Session{0} reset in background; waiting SessionCount: {1}.", session.Id, s_sessions.Count);
19+
Log.Debug("Started Session{0} reset in background; waiting TaskCount: {1}.", session.Id, s_resetTasks.Count);
2020

2121
// release only if it is likely to succeed
2222
if (s_semaphore.CurrentCount == 0)
@@ -69,7 +69,6 @@ public static async Task ReturnSessionsAsync()
6969
Log.Info("Started BackgroundConnectionResetHelper worker.");
7070

7171
List<Task<bool>> localTasks = new();
72-
List<SessionResetTask> localSessions = new();
7372

7473
// keep running until stopped
7574
while (!s_cancellationTokenSource.IsCancellationRequested)
@@ -85,36 +84,18 @@ public static async Task ReturnSessionsAsync()
8584
{
8685
lock (s_lock)
8786
{
88-
if (s_sessions.Count == 0)
89-
{
90-
if (localTasks.Count == 0)
91-
break;
92-
}
93-
else
94-
{
95-
foreach (var session in s_sessions)
96-
{
97-
localSessions.Add(session);
98-
localTasks.Add(session.ResetTask);
99-
}
100-
s_sessions.Clear();
101-
}
87+
localTasks.AddRange(s_resetTasks);
88+
s_resetTasks.Clear();
10289
}
10390

91+
if (localTasks.Count == 0)
92+
break;
93+
10494
if (Log.IsDebugEnabled())
105-
Log.Debug("Found SessionCount {0} session(s) to return.", localSessions.Count);
95+
Log.Debug("Found TaskCount {0} task(s) to process.", localTasks.Count);
10696

107-
while (localTasks.Count != 0)
108-
{
109-
var completedTask = await Task.WhenAny(localTasks).ConfigureAwait(false);
110-
var index = localTasks.IndexOf(completedTask);
111-
var session = localSessions[index].Session;
112-
var connection = localSessions[index].OwningConnection;
113-
localSessions.RemoveAt(index);
114-
localTasks.RemoveAt(index);
115-
await session.Pool!.ReturnAsync(IOBehavior.Asynchronous, session).ConfigureAwait(false);
116-
GC.KeepAlive(connection);
117-
}
97+
await Task.WhenAll(localTasks);
98+
localTasks.Clear();
11899
}
119100
}
120101
catch (Exception ex) when (!(ex is OperationCanceledException oce && oce.CancellationToken == s_cancellationTokenSource.Token))
@@ -124,25 +105,11 @@ public static async Task ReturnSessionsAsync()
124105
}
125106
}
126107

127-
internal readonly struct SessionResetTask
128-
{
129-
public SessionResetTask(ServerSession session, Task<bool> resetTask, MySqlConnection? owningConnection)
130-
{
131-
Session = session;
132-
ResetTask = resetTask;
133-
OwningConnection = owningConnection;
134-
}
135-
136-
public ServerSession Session { get; }
137-
public Task<bool> ResetTask { get; }
138-
public MySqlConnection? OwningConnection { get; }
139-
}
140-
141108
static readonly IMySqlConnectorLogger Log = MySqlConnectorLogManager.CreateLogger(nameof(BackgroundConnectionResetHelper));
142109
static readonly object s_lock = new();
143110
static readonly SemaphoreSlim s_semaphore = new(1, 1);
144111
static readonly CancellationTokenSource s_cancellationTokenSource = new();
145-
static readonly List<SessionResetTask> s_sessions = new();
112+
static readonly List<Task<bool>> s_resetTasks = new();
146113
static Task? s_workerTask;
147114
}
148115
}

src/MySqlConnector/Core/ConnectionPool.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public async ValueTask<ServerSession> GetSessionAsync(MySqlConnection connection
6868
{
6969
if ((ConnectionSettings.ConnectionReset && ConnectionSettings.DeferConnectionReset) || session.DatabaseOverride is not null)
7070
{
71-
reuseSession = await session.TryResetConnectionAsync(ConnectionSettings, ioBehavior, cancellationToken).ConfigureAwait(false);
71+
reuseSession = await session.TryResetConnectionAsync(ConnectionSettings, null, false, ioBehavior, cancellationToken).ConfigureAwait(false);
7272
}
7373
else if ((unchecked((uint) Environment.TickCount) - session.LastReturnedTicks) >= ConnectionSettings.ConnectionIdlePingTime)
7474
{
@@ -158,7 +158,11 @@ private int GetSessionHealth(ServerSession session)
158158
return 0;
159159
}
160160

161-
public async Task ReturnAsync(IOBehavior ioBehavior, ServerSession session)
161+
#if NET45 || NET461 || NET471 || NETSTANDARD1_3 || NETSTANDARD2_0
162+
public async ValueTask<int> ReturnAsync(IOBehavior ioBehavior, ServerSession session)
163+
#else
164+
public async ValueTask ReturnAsync(IOBehavior ioBehavior, ServerSession session)
165+
#endif
162166
{
163167
if (Log.IsDebugEnabled())
164168
Log.Debug("Pool{0} receiving Session{1} back", m_logArguments[0], session.Id);
@@ -188,6 +192,10 @@ public async Task ReturnAsync(IOBehavior ioBehavior, ServerSession session)
188192
{
189193
m_sessionSemaphore.Release();
190194
}
195+
196+
#if NET45 || NET461 || NET471 || NETSTANDARD1_3 || NETSTANDARD2_0
197+
return default;
198+
#endif
191199
}
192200

193201
public async Task ClearAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)

src/MySqlConnector/Core/ServerSession.cs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,11 @@ public ServerSession(ConnectionPool? pool, int poolGeneration, int id)
6868
public bool SupportsSessionTrack => m_supportsSessionTrack;
6969
public bool ProcAccessDenied { get; set; }
7070

71-
public Task ReturnToPoolAsync(IOBehavior ioBehavior, MySqlConnection? owningConnection)
71+
#if NET45 || NET461 || NET471 || NETSTANDARD1_3 || NETSTANDARD2_0
72+
public ValueTask<int> ReturnToPoolAsync(IOBehavior ioBehavior, MySqlConnection? owningConnection)
73+
#else
74+
public ValueTask ReturnToPoolAsync(IOBehavior ioBehavior, MySqlConnection? owningConnection)
75+
#endif
7276
{
7377
if (Log.IsDebugEnabled())
7478
{
@@ -77,11 +81,11 @@ public Task ReturnToPoolAsync(IOBehavior ioBehavior, MySqlConnection? owningConn
7781
}
7882
LastReturnedTicks = unchecked((uint) Environment.TickCount);
7983
if (Pool is null)
80-
return Utility.CompletedTask;
84+
return default;
8185
if (!Pool.ConnectionSettings.ConnectionReset || Pool.ConnectionSettings.DeferConnectionReset)
8286
return Pool.ReturnAsync(ioBehavior, this);
8387
BackgroundConnectionResetHelper.AddSession(this, owningConnection);
84-
return Utility.CompletedTask;
88+
return default;
8589
}
8690

8791
public bool IsConnected
@@ -529,10 +533,11 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
529533
return statusInfo;
530534
}
531535

532-
public async Task<bool> TryResetConnectionAsync(ConnectionSettings cs, IOBehavior ioBehavior, CancellationToken cancellationToken)
536+
public async Task<bool> TryResetConnectionAsync(ConnectionSettings cs, MySqlConnection? owningConnection, bool returnToPool, IOBehavior ioBehavior, CancellationToken cancellationToken)
533537
{
534538
VerifyState(State.Connected);
535539

540+
var success = false;
536541
try
537542
{
538543
// clear all prepared statements; resetting the connection will clear them on the server
@@ -578,7 +583,7 @@ public async Task<bool> TryResetConnectionAsync(ConnectionSettings cs, IOBehavio
578583
payload = await ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
579584
OkPayload.Create(payload.Span, SupportsDeprecateEof, SupportsSessionTrack);
580585

581-
return true;
586+
success = true;
582587
}
583588
catch (IOException ex)
584589
{
@@ -597,7 +602,15 @@ public async Task<bool> TryResetConnectionAsync(ConnectionSettings cs, IOBehavio
597602
Log.Debug(ex, "Session{0} ignoring SocketException in TryResetConnectionAsync", m_logArguments);
598603
}
599604

600-
return false;
605+
if (returnToPool && Pool is not null)
606+
{
607+
await Pool.ReturnAsync(ioBehavior, this).ConfigureAwait(false);
608+
609+
// make sure the MySqlConnection is kept alive until the session is returned to the pool; this prevents it from potentially being detected as "leaked"
610+
GC.KeepAlive(owningConnection);
611+
}
612+
613+
return success;
601614
}
602615

603616
private async Task<PayloadData> SwitchAuthenticationAsync(ConnectionSettings cs, PayloadData payload, IOBehavior ioBehavior, CancellationToken cancellationToken)

0 commit comments

Comments
 (0)