Skip to content

Commit 0221609

Browse files
committed
reaper implementation
1 parent d68b730 commit 0221609

File tree

2 files changed

+112
-34
lines changed

2 files changed

+112
-34
lines changed

src/MySqlConnector/MySqlClient/ConnectionPool.cs

Lines changed: 106 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,17 @@ public async Task<MySqlSession> GetSessionAsync(IOBehavior ioBehavior, Cancellat
2222

2323
try
2424
{
25-
// check for a pooled session
26-
if (m_sessions.TryDequeue(out var session))
25+
// check for a waiting session
26+
MySqlSession session = null;
27+
lock (m_sessions)
28+
{
29+
if (m_sessions.Count > 0)
30+
{
31+
session = m_sessions.First.Value;
32+
m_sessions.RemoveFirst();
33+
}
34+
}
35+
if (session != null)
2736
{
2837
if (session.PoolGeneration != m_generation || !await session.TryPingAsync(ioBehavior, cancellationToken).ConfigureAwait(false))
2938
{
@@ -42,6 +51,7 @@ public async Task<MySqlSession> GetSessionAsync(IOBehavior ioBehavior, Cancellat
4251
}
4352
}
4453

54+
// create a new session
4555
session = new MySqlSession(this, m_generation);
4656
await session.ConnectAsync(m_connectionSettings, ioBehavior, cancellationToken).ConfigureAwait(false);
4757
return session;
@@ -73,7 +83,8 @@ public void Return(MySqlSession session)
7383
try
7484
{
7585
if (SessionIsHealthy(session))
76-
m_sessions.Enqueue(session);
86+
lock (m_sessions)
87+
m_sessions.AddFirst(session);
7788
else
7889
session.DisposeAsync(IOBehavior.Synchronous, CancellationToken.None).ConfigureAwait(false);
7990
}
@@ -87,45 +98,85 @@ public async Task ClearAsync(IOBehavior ioBehavior, CancellationToken cancellati
8798
{
8899
// increment the generation of the connection pool
89100
Interlocked.Increment(ref m_generation);
101+
await CleanPoolAsync(ioBehavior, session => session.PoolGeneration != m_generation, false, cancellationToken).ConfigureAwait(false);
102+
}
90103

91-
var waitTimeout = TimeSpan.FromMilliseconds(10);
92-
while (true)
104+
public async Task ReapAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
105+
{
106+
if (m_connectionSettings.ConnectionIdleTimeout == 0)
107+
return;
108+
await CleanPoolAsync(ioBehavior, session => (DateTime.UtcNow - session.LastReturnedUtc).TotalSeconds >= m_connectionSettings.ConnectionIdleTimeout, true, cancellationToken).ConfigureAwait(false);
109+
}
110+
111+
private async Task CleanPoolAsync(IOBehavior ioBehavior, Func<MySqlSession, bool> shouldCleanFn, bool respectMinPoolSize, CancellationToken cancellationToken)
112+
{
113+
// synchronize access to this method as only one clean routine should be run at a time
114+
if (ioBehavior == IOBehavior.Asynchronous)
115+
await m_cleanSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
116+
else
117+
m_cleanSemaphore.Wait(cancellationToken);
118+
119+
try
93120
{
94-
// try to get an open slot; if this fails, connection pool is full and sessions will be disposed when returned to pool
95-
if (ioBehavior == IOBehavior.Asynchronous)
96-
{
97-
if (!await m_sessionSemaphore.WaitAsync(waitTimeout, cancellationToken).ConfigureAwait(false))
98-
return;
99-
}
100-
else
121+
var waitTimeout = TimeSpan.FromMilliseconds(10);
122+
while (true)
101123
{
102-
if (!m_sessionSemaphore.Wait(waitTimeout, cancellationToken))
103-
return;
104-
}
124+
// if respectMinPoolSize is true, return if (leased sessions + waiting sessions <= minPoolSize)
125+
if (respectMinPoolSize)
126+
lock (m_sessions)
127+
if (m_connectionSettings.MaximumPoolSize - m_sessionSemaphore.CurrentCount + m_sessions.Count <= m_connectionSettings.MinimumPoolSize)
128+
return;
129+
130+
// try to get an open slot; if this fails, connection pool is full and sessions will be disposed when returned to pool
131+
if (ioBehavior == IOBehavior.Asynchronous)
132+
{
133+
if (!await m_sessionSemaphore.WaitAsync(waitTimeout, cancellationToken).ConfigureAwait(false))
134+
return;
135+
}
136+
else
137+
{
138+
if (!m_sessionSemaphore.Wait(waitTimeout, cancellationToken))
139+
return;
140+
}
105141

106-
try
107-
{
108-
if (m_sessions.TryDequeue(out var session))
142+
try
109143
{
110-
if (session.PoolGeneration != m_generation)
144+
// check for a waiting session
145+
MySqlSession session = null;
146+
lock (m_sessions)
147+
{
148+
if (m_sessions.Count > 0)
149+
{
150+
session = m_sessions.Last.Value;
151+
m_sessions.RemoveLast();
152+
}
153+
}
154+
if (session == null)
155+
return;
156+
157+
if (shouldCleanFn(session))
111158
{
112-
// session generation does not match pool generation; dispose of it and continue iterating
159+
// session should be cleaned; dispose it and keep iterating
113160
await session.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
114-
continue;
115161
}
116162
else
117163
{
118-
// session generation matches pool generation; put it back in the queue and stop iterating
119-
m_sessions.Enqueue(session);
164+
// session should not be cleaned; put it back in the queue and stop iterating
165+
lock (m_sessions)
166+
m_sessions.AddLast(session);
167+
return;
120168
}
121169
}
122-
return;
123-
}
124-
finally
125-
{
126-
m_sessionSemaphore.Release();
170+
finally
171+
{
172+
m_sessionSemaphore.Release();
173+
}
127174
}
128175
}
176+
finally
177+
{
178+
m_cleanSemaphore.Release();
179+
}
129180
}
130181

131182
public static ConnectionPool GetPool(ConnectionSettings cs)
@@ -144,25 +195,47 @@ public static ConnectionPool GetPool(ConnectionSettings cs)
144195

145196
public static async Task ClearPoolsAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
146197
{
147-
var pools = new List<ConnectionPool>(s_pools.Values);
148-
149-
foreach (var pool in pools)
198+
foreach (var pool in s_pools.Values)
150199
await pool.ClearAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
151200
}
152201

202+
public static async Task ReapPoolsAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
203+
{
204+
foreach (var pool in s_pools.Values)
205+
await pool.ReapAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
206+
}
207+
153208
private ConnectionPool(ConnectionSettings cs)
154209
{
155210
m_connectionSettings = cs;
156211
m_generation = 0;
212+
m_cleanSemaphore = new SemaphoreSlim(1);
157213
m_sessionSemaphore = new SemaphoreSlim(cs.MaximumPoolSize);
158-
m_sessions = new ConcurrentQueue<MySqlSession>();
214+
m_sessions = new LinkedList<MySqlSession>();
159215
}
160216

161217
static readonly ConcurrentDictionary<string, ConnectionPool> s_pools = new ConcurrentDictionary<string, ConnectionPool>();
218+
static readonly TimeSpan ReaperInterval = TimeSpan.FromMinutes(1);
219+
static readonly Task Reaper = Task.Run(async () => {
220+
while (true)
221+
{
222+
var task = Task.Delay(ReaperInterval);
223+
try
224+
{
225+
await ReapPoolsAsync(IOBehavior.Asynchronous, new CancellationTokenSource(ReaperInterval).Token).ConfigureAwait(false);
226+
}
227+
catch
228+
{
229+
// do nothing; we'll try to reap again
230+
}
231+
await task.ConfigureAwait(false);
232+
}
233+
});
162234

163235
int m_generation;
236+
readonly SemaphoreSlim m_cleanSemaphore;
164237
readonly SemaphoreSlim m_sessionSemaphore;
165-
readonly ConcurrentQueue<MySqlSession> m_sessions;
238+
readonly LinkedList<MySqlSession> m_sessions;
166239
readonly ConnectionSettings m_connectionSettings;
167240
}
168241
}

src/MySqlConnector/Serialization/MySqlSession.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,14 @@ public MySqlSession(ConnectionPool pool, int poolGeneration)
3434
public DateTime CreatedUtc { get; }
3535
public ConnectionPool Pool { get; }
3636
public int PoolGeneration { get; }
37+
public DateTime LastReturnedUtc { get; private set; }
3738
public string DatabaseOverride { get; set; }
3839

39-
public void ReturnToPool() => Pool?.Return(this);
40+
public void ReturnToPool()
41+
{
42+
LastReturnedUtc = DateTime.UtcNow;
43+
Pool?.Return(this);
44+
}
4045

4146
public bool IsConnected => m_state == State.Connected;
4247

0 commit comments

Comments
 (0)