@@ -14,7 +14,13 @@ public async Task<MySqlSession> GetSessionAsync(IOBehavior ioBehavior, Cancellat
14
14
{
15
15
cancellationToken . ThrowIfCancellationRequested ( ) ;
16
16
17
- // wait for an open slot
17
+ // if all sessions are used, see if any have been leaked and can be recovered
18
+ // check at most once per second (although this isn't enforced via a mutex so multiple threads might block
19
+ // on the lock in RecoverLeakedSessions in high-concurrency situations
20
+ if ( m_sessionSemaphore . CurrentCount == 0 && unchecked ( ( ( uint ) Environment . TickCount ) - m_lastRecoveryTime ) >= 1000u )
21
+ RecoverLeakedSessions ( ) ;
22
+
23
+ // wait for an open slot (until the cancellationToken is cancelled, which is typically due to timeout)
18
24
if ( ioBehavior == IOBehavior . Asynchronous )
19
25
await m_sessionSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
20
26
else
@@ -46,14 +52,19 @@ public async Task<MySqlSession> GetSessionAsync(IOBehavior ioBehavior, Cancellat
46
52
{
47
53
await session . ResetConnectionAsync ( m_connectionSettings , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
48
54
}
55
+
49
56
// pooled session is ready to be used; return it
57
+ lock ( m_leasedSessions )
58
+ m_leasedSessions . Add ( session . Id , new WeakReference < MySqlSession > ( session ) ) ;
50
59
return session ;
51
60
}
52
61
}
53
62
54
63
// create a new session
55
- session = new MySqlSession ( this , m_generation ) ;
64
+ session = new MySqlSession ( this , m_generation , Interlocked . Increment ( ref m_lastId ) ) ;
56
65
await session . ConnectAsync ( m_connectionSettings , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
66
+ lock ( m_leasedSessions )
67
+ m_leasedSessions . Add ( session . Id , new WeakReference < MySqlSession > ( session ) ) ;
57
68
return session ;
58
69
}
59
70
catch
@@ -82,6 +93,8 @@ public void Return(MySqlSession session)
82
93
{
83
94
try
84
95
{
96
+ lock ( m_leasedSessions )
97
+ m_leasedSessions . Remove ( session . Id ) ;
85
98
if ( SessionIsHealthy ( session ) )
86
99
lock ( m_sessions )
87
100
m_sessions . AddFirst ( session ) ;
@@ -103,11 +116,36 @@ public async Task ClearAsync(IOBehavior ioBehavior, CancellationToken cancellati
103
116
104
117
public async Task ReapAsync ( IOBehavior ioBehavior , CancellationToken cancellationToken )
105
118
{
119
+ RecoverLeakedSessions ( ) ;
106
120
if ( m_connectionSettings . ConnectionIdleTimeout == 0 )
107
121
return ;
108
122
await CleanPoolAsync ( ioBehavior , session => ( DateTime . UtcNow - session . LastReturnedUtc ) . TotalSeconds >= m_connectionSettings . ConnectionIdleTimeout , true , cancellationToken ) . ConfigureAwait ( false ) ;
109
123
}
110
124
125
+ /// <summary>
126
+ /// Examines all the <see cref="WeakReference{MySqlSession}"/> in <see cref="m_leasedSessions"/> to determine if any
127
+ /// <see cref="MySqlSession"/> objects have been garbage-collected. If so, assumes that the related <see cref="MySqlConnection"/>
128
+ /// was not properly disposed but the associated server connection has been closed (by the finalizer). Releases the semaphore
129
+ /// once for each leaked session to allow new client connections to be made.
130
+ /// </summary>
131
+ private void RecoverLeakedSessions ( )
132
+ {
133
+ var recoveredIds = new List < int > ( ) ;
134
+ lock ( m_leasedSessions )
135
+ {
136
+ m_lastRecoveryTime = unchecked ( ( uint ) Environment . TickCount ) ;
137
+ foreach ( var pair in m_leasedSessions )
138
+ {
139
+ if ( ! pair . Value . TryGetTarget ( out var _ ) )
140
+ recoveredIds . Add ( pair . Key ) ;
141
+ }
142
+ foreach ( var id in recoveredIds )
143
+ m_leasedSessions . Remove ( id ) ;
144
+ }
145
+ if ( recoveredIds . Count > 0 )
146
+ m_sessionSemaphore . Release ( recoveredIds . Count ) ;
147
+ }
148
+
111
149
private async Task CleanPoolAsync ( IOBehavior ioBehavior , Func < MySqlSession , bool > shouldCleanFn , bool respectMinPoolSize , CancellationToken cancellationToken )
112
150
{
113
151
// synchronize access to this method as only one clean routine should be run at a time
@@ -212,6 +250,7 @@ private ConnectionPool(ConnectionSettings cs)
212
250
m_cleanSemaphore = new SemaphoreSlim ( 1 ) ;
213
251
m_sessionSemaphore = new SemaphoreSlim ( cs . MaximumPoolSize ) ;
214
252
m_sessions = new LinkedList < MySqlSession > ( ) ;
253
+ m_leasedSessions = new Dictionary < int , WeakReference < MySqlSession > > ( ) ;
215
254
}
216
255
217
256
static readonly ConcurrentDictionary < string , ConnectionPool > s_pools = new ConcurrentDictionary < string , ConnectionPool > ( ) ;
@@ -241,5 +280,8 @@ private ConnectionPool(ConnectionSettings cs)
241
280
readonly SemaphoreSlim m_sessionSemaphore ;
242
281
readonly LinkedList < MySqlSession > m_sessions ;
243
282
readonly ConnectionSettings m_connectionSettings ;
283
+ readonly Dictionary < int , WeakReference < MySqlSession > > m_leasedSessions ;
284
+ int m_lastId ;
285
+ uint m_lastRecoveryTime ;
244
286
}
245
287
}
0 commit comments