4
4
using System . Threading ;
5
5
using System . Threading . Tasks ;
6
6
using MySql . Data . MySqlClient ;
7
+ using MySqlConnector . Logging ;
7
8
using MySqlConnector . Protocol . Serialization ;
9
+ using MySqlConnector . Utilities ;
8
10
9
11
namespace MySqlConnector . Core
10
12
{
11
13
internal sealed class ConnectionPool
12
14
{
15
+ public int Id { get ; }
16
+
13
17
public async Task < ServerSession > GetSessionAsync ( MySqlConnection connection , IOBehavior ioBehavior , CancellationToken cancellationToken )
14
18
{
15
19
cancellationToken . ThrowIfCancellationRequested ( ) ;
@@ -18,12 +22,16 @@ public async Task<ServerSession> GetSessionAsync(MySqlConnection connection, IOB
18
22
// check at most once per second (although this isn't enforced via a mutex so multiple threads might block
19
23
// on the lock in RecoverLeakedSessions in high-concurrency situations
20
24
if ( m_sessionSemaphore . CurrentCount == 0 && unchecked ( ( ( uint ) Environment . TickCount ) - m_lastRecoveryTime ) >= 1000u )
25
+ {
26
+ Log . Warn ( "{0} is empty; recovering leaked sessions" , m_logArguments ) ;
21
27
RecoverLeakedSessions ( ) ;
28
+ }
22
29
23
30
if ( m_connectionSettings . MinimumPoolSize > 0 )
24
31
await CreateMinimumPooledSessions ( ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
25
32
26
33
// wait for an open slot (until the cancellationToken is cancelled, which is typically due to timeout)
34
+ Log . Debug ( "{0} waiting for an available session" , m_logArguments ) ;
27
35
if ( ioBehavior == IOBehavior . Asynchronous )
28
36
await m_sessionSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
29
37
else
@@ -43,10 +51,12 @@ public async Task<ServerSession> GetSessionAsync(MySqlConnection connection, IOB
43
51
}
44
52
if ( session != null )
45
53
{
54
+ Log . Debug ( "{0} found an existing session; checking it for validity" , m_logArguments ) ;
46
55
bool reuseSession ;
47
56
48
57
if ( session . PoolGeneration != m_generation )
49
58
{
59
+ Log . Debug ( "{0} discarding session due to wrong generation" , m_logArguments ) ;
50
60
reuseSession = false ;
51
61
}
52
62
else
@@ -64,26 +74,41 @@ public async Task<ServerSession> GetSessionAsync(MySqlConnection connection, IOB
64
74
if ( ! reuseSession )
65
75
{
66
76
// session is either old or cannot communicate with the server
77
+ Log . Warn ( "{0} Session{1} is unusable; destroying it" , m_logArguments [ 0 ] , session . Id ) ;
67
78
AdjustHostConnectionCount ( session , - 1 ) ;
68
79
await session . DisposeAsync ( ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
69
80
}
70
81
else
71
82
{
72
83
// pooled session is ready to be used; return it
73
84
session . OwningConnection = new WeakReference < MySqlConnection > ( connection ) ;
85
+ int leasedSessionsCountPooled ;
74
86
lock ( m_leasedSessions )
87
+ {
75
88
m_leasedSessions . Add ( session . Id , session ) ;
89
+ leasedSessionsCountPooled = m_leasedSessions . Count ;
90
+ }
91
+ if ( Log . IsDebugEnabled ( ) )
92
+ Log . Debug ( "{0} returning pooled Session{1} to caller; m_leasedSessions.Count={2}" , m_logArguments [ 0 ] , session . Id , leasedSessionsCountPooled ) ;
76
93
return session ;
77
94
}
78
95
}
79
96
80
97
// create a new session
81
- session = new ServerSession ( this , m_generation , Interlocked . Increment ( ref m_lastId ) ) ;
98
+ session = new ServerSession ( this , m_generation , Interlocked . Increment ( ref m_lastSessionId ) ) ;
99
+ if ( Log . IsInfoEnabled ( ) )
100
+ Log . Info ( "{0} no pooled session available; created new Session{1}" , m_logArguments [ 0 ] , session . Id ) ;
82
101
await session . ConnectAsync ( m_connectionSettings , m_loadBalancer , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
83
102
AdjustHostConnectionCount ( session , 1 ) ;
84
103
session . OwningConnection = new WeakReference < MySqlConnection > ( connection ) ;
104
+ int leasedSessionsCountNew ;
85
105
lock ( m_leasedSessions )
106
+ {
86
107
m_leasedSessions . Add ( session . Id , session ) ;
108
+ leasedSessionsCountNew = m_leasedSessions . Count ;
109
+ }
110
+ if ( Log . IsDebugEnabled ( ) )
111
+ Log . Debug ( "{0} returning new Session{1} to caller; m_leasedSessions.Count={2}" , m_logArguments [ 0 ] , session . Id , leasedSessionsCountNew ) ;
87
112
return session ;
88
113
}
89
114
catch
@@ -110,6 +135,9 @@ private bool SessionIsHealthy(ServerSession session)
110
135
111
136
public void Return ( ServerSession session )
112
137
{
138
+ if ( Log . IsDebugEnabled ( ) )
139
+ Log . Debug ( "{0} receiving Session{1} back" , m_logArguments [ 0 ] , session . Id ) ;
140
+
113
141
try
114
142
{
115
143
lock ( m_leasedSessions )
@@ -122,6 +150,7 @@ public void Return(ServerSession session)
122
150
}
123
151
else
124
152
{
153
+ Log . Warn ( "{0} received invalid Session{1}; destroying it" , m_logArguments [ 0 ] , session . Id ) ;
125
154
AdjustHostConnectionCount ( session , - 1 ) ;
126
155
session . DisposeAsync ( IOBehavior . Synchronous , CancellationToken . None ) . GetAwaiter ( ) . GetResult ( ) ;
127
156
}
@@ -135,13 +164,15 @@ public void Return(ServerSession session)
135
164
public async Task ClearAsync ( IOBehavior ioBehavior , CancellationToken cancellationToken )
136
165
{
137
166
// increment the generation of the connection pool
167
+ Log . Info ( "{0} clearing connection pool" , m_logArguments ) ;
138
168
Interlocked . Increment ( ref m_generation ) ;
139
169
RecoverLeakedSessions ( ) ;
140
170
await CleanPoolAsync ( ioBehavior , session => session . PoolGeneration != m_generation , false , cancellationToken ) . ConfigureAwait ( false ) ;
141
171
}
142
172
143
173
public async Task ReapAsync ( IOBehavior ioBehavior , CancellationToken cancellationToken )
144
174
{
175
+ Log . Debug ( "{0} reaping connection pool" , m_logArguments ) ;
145
176
RecoverLeakedSessions ( ) ;
146
177
if ( m_connectionSettings . ConnectionIdleTimeout == 0 )
147
178
return ;
@@ -165,6 +196,10 @@ private void RecoverLeakedSessions()
165
196
recoveredSessions . Add ( session ) ;
166
197
}
167
198
}
199
+ if ( recoveredSessions . Count == 0 )
200
+ Log . Debug ( "{0} recovered no sessions" , m_logArguments ) ;
201
+ else
202
+ Log . Warn ( "{0} recovered {1} sessions" , m_logArguments [ 0 ] , recoveredSessions . Count ) ;
168
203
foreach ( var session in recoveredSessions )
169
204
session . ReturnToPool ( ) ;
170
205
}
@@ -218,6 +253,7 @@ private async Task CleanPoolAsync(IOBehavior ioBehavior, Func<ServerSession, boo
218
253
if ( shouldCleanFn ( session ) )
219
254
{
220
255
// session should be cleaned; dispose it and keep iterating
256
+ Log . Info ( "{0} found Session{1} to clean up" , m_logArguments [ 0 ] , session . Id ) ;
221
257
await session . DisposeAsync ( ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
222
258
}
223
259
else
@@ -266,7 +302,8 @@ private async Task CreateMinimumPooledSessions(IOBehavior ioBehavior, Cancellati
266
302
267
303
try
268
304
{
269
- var session = new ServerSession ( this , m_generation , Interlocked . Increment ( ref m_lastId ) ) ;
305
+ var session = new ServerSession ( this , m_generation , Interlocked . Increment ( ref m_lastSessionId ) ) ;
306
+ Log . Info ( "{0} created Session{1} to reach minimum pool size" , m_logArguments [ 0 ] , session . Id ) ;
270
307
await session . ConnectAsync ( m_connectionSettings , m_loadBalancer , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
271
308
AdjustHostConnectionCount ( session , 1 ) ;
272
309
lock ( m_sessions )
@@ -343,7 +380,7 @@ private ConnectionPool(ConnectionSettings cs)
343
380
m_cleanSemaphore = new SemaphoreSlim ( 1 ) ;
344
381
m_sessionSemaphore = new SemaphoreSlim ( cs . MaximumPoolSize ) ;
345
382
m_sessions = new LinkedList < ServerSession > ( ) ;
346
- m_leasedSessions = new Dictionary < int , ServerSession > ( ) ;
383
+ m_leasedSessions = new Dictionary < string , ServerSession > ( ) ;
347
384
if ( cs . LoadBalance == MySqlLoadBalance . LeastConnections )
348
385
{
349
386
m_hostSessions = new Dictionary < string , int > ( ) ;
@@ -355,6 +392,14 @@ private ConnectionPool(ConnectionSettings cs)
355
392
cs . LoadBalance == MySqlLoadBalance . Random ? RandomLoadBalancer . Instance :
356
393
cs . LoadBalance == MySqlLoadBalance . LeastConnections ? new LeastConnectionsLoadBalancer ( this ) :
357
394
( ILoadBalancer ) new RoundRobinLoadBalancer ( ) ;
395
+
396
+ Id = Interlocked . Increment ( ref s_poolId ) ;
397
+ m_logArguments = new object [ ] { "Pool{0}" . FormatInvariant ( Id ) } ;
398
+ if ( Log . IsInfoEnabled ( ) )
399
+ {
400
+ var csb = new MySqlConnectionStringBuilder ( cs . ConnectionString ) ;
401
+ Log . Info ( "{0} creating new connection pool for {1}" , m_logArguments [ 0 ] , csb . GetConnectionString ( includePassword : false ) ) ;
402
+ }
358
403
}
359
404
360
405
private void AdjustHostConnectionCount ( ServerSession session , int delta )
@@ -379,6 +424,7 @@ public IEnumerable<string> LoadBalance(IReadOnlyList<string> hosts)
379
424
readonly ConnectionPool m_pool ;
380
425
}
381
426
427
+ static readonly IMySqlConnectorLogger Log = MySqlConnectorLogManager . CreateLogger ( nameof ( ConnectionPool ) ) ;
382
428
static readonly ReaderWriterLockSlim s_poolLock = new ReaderWriterLockSlim ( ) ;
383
429
static readonly Dictionary < string , ConnectionPool > s_pools = new Dictionary < string , ConnectionPool > ( ) ;
384
430
#if DEBUG
@@ -402,15 +448,18 @@ public IEnumerable<string> LoadBalance(IReadOnlyList<string> hosts)
402
448
}
403
449
} ) ;
404
450
451
+ static int s_poolId ;
452
+
405
453
int m_generation ;
406
454
readonly SemaphoreSlim m_cleanSemaphore ;
407
455
readonly SemaphoreSlim m_sessionSemaphore ;
408
456
readonly LinkedList < ServerSession > m_sessions ;
409
457
readonly ConnectionSettings m_connectionSettings ;
410
- readonly Dictionary < int , ServerSession > m_leasedSessions ;
458
+ readonly Dictionary < string , ServerSession > m_leasedSessions ;
411
459
readonly ILoadBalancer m_loadBalancer ;
412
460
readonly Dictionary < string , int > m_hostSessions ;
413
- int m_lastId ;
461
+ readonly object [ ] m_logArguments ;
414
462
uint m_lastRecoveryTime ;
463
+ int m_lastSessionId ;
415
464
}
416
465
}
0 commit comments