@@ -54,6 +54,9 @@ internal sealed class ExclusiveConnectionPool : IConnectionPool
54
54
private readonly Action < ConnectionPoolOpenedEvent > _openedEventHandler ;
55
55
private readonly Action < ConnectionPoolClosingEvent > _closingEventHandler ;
56
56
private readonly Action < ConnectionPoolClosedEvent > _closedEventHandler ;
57
+ private readonly Action < ConnectionPoolClearingEvent > _clearingEventHandler ;
58
+ private readonly Action < ConnectionPoolClearedEvent > _clearedEventHandler ;
59
+ private readonly Action < ConnectionCreatedEvent > _connectionCreatedEventHandler ;
57
60
58
61
// constructors
59
62
public ExclusiveConnectionPool (
@@ -71,7 +74,9 @@ public ExclusiveConnectionPool(
71
74
72
75
_connectionHolder = new ListConnectionHolder ( eventSubscriber ) ;
73
76
_poolQueue = new WaitQueue ( settings . MaxConnections ) ;
77
+ #pragma warning disable 618
74
78
_waitQueue = new SemaphoreSlim ( settings . WaitQueueSize ) ;
79
+ #pragma warning restore 618
75
80
_maintenanceCancellationTokenSource = new CancellationTokenSource ( ) ;
76
81
_state = new InterlockedInt32 ( State . Initial ) ;
77
82
@@ -88,6 +93,9 @@ public ExclusiveConnectionPool(
88
93
eventSubscriber . TryGetEventHandler ( out _closedEventHandler ) ;
89
94
eventSubscriber . TryGetEventHandler ( out _addingConnectionEventHandler ) ;
90
95
eventSubscriber . TryGetEventHandler ( out _addedConnectionEventHandler ) ;
96
+ eventSubscriber . TryGetEventHandler ( out _clearingEventHandler ) ;
97
+ eventSubscriber . TryGetEventHandler ( out _clearedEventHandler ) ;
98
+ eventSubscriber . TryGetEventHandler ( out _connectionCreatedEventHandler ) ;
91
99
}
92
100
93
101
// properties
@@ -140,12 +148,12 @@ public int UsedCount
140
148
// public methods
141
149
public IConnectionHandle AcquireConnection ( CancellationToken cancellationToken )
142
150
{
143
- ThrowIfNotOpen ( ) ;
144
-
145
151
var helper = new AcquireConnectionHelper ( this ) ;
146
152
try
147
153
{
148
154
helper . CheckingOutConnection ( ) ;
155
+ ThrowIfNotOpen ( ) ;
156
+ helper . EnterWaitQueue ( ) ;
149
157
var enteredPool = _poolQueue . Wait ( _settings . WaitQueueTimeout , cancellationToken ) ;
150
158
return helper . EnteredPool ( enteredPool ) ;
151
159
}
@@ -162,12 +170,12 @@ public IConnectionHandle AcquireConnection(CancellationToken cancellationToken)
162
170
163
171
public async Task < IConnectionHandle > AcquireConnectionAsync ( CancellationToken cancellationToken )
164
172
{
165
- ThrowIfNotOpen ( ) ;
166
-
167
173
var helper = new AcquireConnectionHelper ( this ) ;
168
174
try
169
175
{
170
176
helper . CheckingOutConnection ( ) ;
177
+ ThrowIfNotOpen ( ) ;
178
+ helper . EnterWaitQueue ( ) ;
171
179
var enteredPool = await _poolQueue . WaitAsync ( _settings . WaitQueueTimeout , cancellationToken ) . ConfigureAwait ( false ) ;
172
180
return helper . EnteredPool ( enteredPool ) ;
173
181
}
@@ -185,13 +193,20 @@ public async Task<IConnectionHandle> AcquireConnectionAsync(CancellationToken ca
185
193
public void Clear ( )
186
194
{
187
195
ThrowIfNotOpen ( ) ;
196
+
197
+ _clearingEventHandler ? . Invoke ( new ConnectionPoolClearingEvent ( _serverId , _settings ) ) ;
198
+
188
199
Interlocked . Increment ( ref _generation ) ;
200
+
201
+ _clearedEventHandler ? . Invoke ( new ConnectionPoolClearedEvent ( _serverId , _settings ) ) ;
189
202
}
190
203
191
204
private PooledConnection CreateNewConnection ( )
192
205
{
193
206
var connection = _connectionFactory . CreateConnection ( _serverId , _endPoint ) ;
194
- return new PooledConnection ( this , connection ) ;
207
+ var pooledConnection = new PooledConnection ( this , connection ) ;
208
+ _connectionCreatedEventHandler ? . Invoke ( new ConnectionCreatedEvent ( connection . ConnectionId , connection . Settings , EventContext . OperationId ) ) ;
209
+ return pooledConnection ;
195
210
}
196
211
197
212
public void Initialize ( )
@@ -204,12 +219,12 @@ public void Initialize()
204
219
_openingEventHandler ( new ConnectionPoolOpeningEvent ( _serverId , _settings ) ) ;
205
220
}
206
221
207
- MaintainSizeAsync ( ) . ConfigureAwait ( false ) ;
208
-
209
222
if ( _openedEventHandler != null )
210
223
{
211
224
_openedEventHandler ( new ConnectionPoolOpenedEvent ( _serverId , _settings ) ) ;
212
225
}
226
+
227
+ MaintainSizeAsync ( ) . ConfigureAwait ( false ) ;
213
228
}
214
229
}
215
230
@@ -335,25 +350,28 @@ private async Task EnsureMinSizeAsync(CancellationToken cancellationToken)
335
350
336
351
private void ReleaseConnection ( PooledConnection connection )
337
352
{
338
- if ( _state . Value == State . Disposed )
353
+ if ( _checkingInConnectionEventHandler != null )
339
354
{
340
- connection . Dispose ( ) ;
341
- return ;
355
+ _checkingInConnectionEventHandler ( new ConnectionPoolCheckingInConnectionEvent ( connection . ConnectionId , EventContext . OperationId ) ) ;
342
356
}
343
357
344
- if ( _checkingInConnectionEventHandler != null )
358
+ if ( _checkedInConnectionEventHandler != null )
345
359
{
346
- _checkingInConnectionEventHandler ( new ConnectionPoolCheckingInConnectionEvent ( connection . ConnectionId , EventContext . OperationId ) ) ;
360
+ _checkedInConnectionEventHandler ( new ConnectionPoolCheckedInConnectionEvent ( connection . ConnectionId , TimeSpan . Zero , EventContext . OperationId ) ) ;
347
361
}
348
362
349
- var stopwatch = Stopwatch . StartNew ( ) ;
350
- _connectionHolder . Return ( connection ) ;
351
- _poolQueue . Release ( ) ;
352
- stopwatch . Stop ( ) ;
363
+ if ( ! connection . IsExpired && _state . Value != State . Disposed )
364
+ {
365
+ _connectionHolder . Return ( connection ) ;
366
+ }
367
+ else
368
+ {
369
+ _connectionHolder . RemoveConnection ( connection ) ;
370
+ }
353
371
354
- if ( _checkedInConnectionEventHandler != null )
372
+ if ( _state . Value != State . Disposed )
355
373
{
356
- _checkedInConnectionEventHandler ( new ConnectionPoolCheckedInConnectionEvent ( connection . ConnectionId , stopwatch . Elapsed , EventContext . OperationId ) ) ;
374
+ _poolQueue . Release ( ) ;
357
375
}
358
376
}
359
377
@@ -404,7 +422,10 @@ public void CheckingOutConnection()
404
422
{
405
423
handler ( new ConnectionPoolCheckingOutConnectionEvent ( _pool . _serverId , EventContext . OperationId ) ) ;
406
424
}
425
+ }
407
426
427
+ public void EnterWaitQueue ( )
428
+ {
408
429
_enteredWaitQueue = _pool . _waitQueue . Wait ( 0 ) ; // don't wait...
409
430
if ( ! _enteredWaitQueue )
410
431
{
@@ -467,7 +488,14 @@ public void HandleException(Exception ex)
467
488
var handler = _pool . _checkingOutConnectionFailedEventHandler ;
468
489
if ( handler != null )
469
490
{
470
- handler ( new ConnectionPoolCheckingOutConnectionFailedEvent ( _pool . _serverId , ex , EventContext . OperationId ) ) ;
491
+ ConnectionCheckOutFailedReason reason ;
492
+ switch ( ex )
493
+ {
494
+ case ObjectDisposedException _: reason = ConnectionCheckOutFailedReason . PoolClosed ; break ;
495
+ case TimeoutException _: reason = ConnectionCheckOutFailedReason . Timeout ; break ;
496
+ default : reason = ConnectionCheckOutFailedReason . ConnectionError ; break ;
497
+ }
498
+ handler ( new ConnectionPoolCheckingOutConnectionFailedEvent ( _pool . _serverId , ex , EventContext . OperationId , reason ) ) ;
471
499
}
472
500
}
473
501
@@ -788,19 +816,13 @@ public PooledConnection Acquire()
788
816
789
817
public void Return ( PooledConnection connection )
790
818
{
791
- if ( connection . IsExpired )
792
- {
793
- RemoveConnection ( connection ) ;
794
- return ;
795
- }
796
-
797
819
lock ( _lock )
798
820
{
799
821
_connections . Add ( connection ) ;
800
822
}
801
823
}
802
824
803
- private void RemoveConnection ( PooledConnection connection )
825
+ public void RemoveConnection ( PooledConnection connection )
804
826
{
805
827
if ( _removingConnectionEventHandler != null )
806
828
{
0 commit comments