3
3
using Enyim . Caching . Memcached . Results ;
4
4
using Enyim . Caching . Memcached . Results . Extensions ;
5
5
using Enyim . Collections ;
6
+
6
7
using Microsoft . Extensions . Logging ;
8
+
7
9
using System ;
8
10
using System . Collections . Concurrent ;
9
11
using System . Collections . Generic ;
@@ -25,7 +27,7 @@ namespace Enyim.Caching.Memcached
25
27
public class MemcachedNode : IMemcachedNode
26
28
{
27
29
private readonly ILogger _logger ;
28
- private static readonly object SyncRoot = new Object ( ) ;
30
+ private static readonly object SyncRoot = new ( ) ;
29
31
private bool _isDisposed ;
30
32
private readonly EndPoint _endPoint ;
31
33
private readonly ISocketPoolConfiguration _config ;
@@ -46,8 +48,8 @@ public MemcachedNode(
46
48
EndPointString = endpoint ? . ToString ( ) . Replace ( "Unspecified/" , string . Empty ) ;
47
49
_config = socketPoolConfig ;
48
50
49
- if ( socketPoolConfig . ConnectionTimeout . TotalMilliseconds >= Int32 . MaxValue )
50
- throw new InvalidOperationException ( "ConnectionTimeout must be < Int32 .MaxValue" ) ;
51
+ if ( socketPoolConfig . ConnectionTimeout . TotalMilliseconds >= int . MaxValue )
52
+ throw new InvalidOperationException ( "ConnectionTimeout must be < int .MaxValue" ) ;
51
53
52
54
if ( socketPoolConfig . InitPoolTimeout . TotalSeconds < 1 )
53
55
{
@@ -281,6 +283,7 @@ private class InternalPoolImpl : IDisposable
281
283
private readonly EndPoint _endPoint ;
282
284
private readonly TimeSpan _queueTimeout ;
283
285
private readonly TimeSpan _receiveTimeout ;
286
+ private readonly TimeSpan _connectionIdleTimeout ;
284
287
private SemaphoreSlim _semaphore ;
285
288
286
289
private readonly object initLock = new Object ( ) ;
@@ -298,12 +301,15 @@ internal InternalPoolImpl(
298
301
throw new InvalidOperationException ( "queueTimeout must be >= TimeSpan.Zero" , null ) ;
299
302
if ( config . ReceiveTimeout < TimeSpan . Zero )
300
303
throw new InvalidOperationException ( "ReceiveTimeout must be >= TimeSpan.Zero" , null ) ;
304
+ if ( config . ConnectionIdleTimeout < TimeSpan . Zero )
305
+ throw new InvalidOperationException ( "ConnectionIdleTimeout must be >= TimeSpan.Zero" , null ) ;
301
306
302
307
_ownerNode = ownerNode ;
303
308
_isAlive = true ;
304
309
_endPoint = ownerNode . EndPoint ;
305
310
_queueTimeout = config . QueueTimeout ;
306
311
_receiveTimeout = config . ReceiveTimeout ;
312
+ _connectionIdleTimeout = config . ConnectionIdleTimeout ;
307
313
308
314
_minItems = config . MinPoolSize ;
309
315
_maxItems = config . MaxPoolSize ;
@@ -344,7 +350,7 @@ internal void InitPool()
344
350
}
345
351
catch ( Exception e )
346
352
{
347
- _logger . LogError ( "Could not init pool." , new EventId ( 0 ) , e ) ;
353
+ _logger . LogError ( e , "Could not init pool." ) ;
348
354
349
355
MarkAsDead ( ) ;
350
356
}
@@ -379,7 +385,7 @@ internal async Task InitPoolAsync()
379
385
}
380
386
catch ( Exception e )
381
387
{
382
- _logger . LogError ( "Could not init pool." , new EventId ( 0 ) , e ) ;
388
+ _logger . LogError ( e , "Could not init pool." ) ;
383
389
384
390
MarkAsDead ( ) ;
385
391
}
@@ -418,10 +424,9 @@ public DateTime MarkedAsDeadUtc
418
424
public IPooledSocketResult Acquire ( )
419
425
{
420
426
var result = new PooledSocketResult ( ) ;
421
- var message = string . Empty ;
422
-
423
427
if ( _isDebugEnabled ) _logger . LogDebug ( $ "Acquiring stream from pool on node '{ _endPoint } '") ;
424
428
429
+ string message ;
425
430
if ( ! _isAlive || _isDisposed )
426
431
{
427
432
message = "Pool is dead or disposed, returning null. " + _endPoint ;
@@ -432,8 +437,6 @@ public IPooledSocketResult Acquire()
432
437
return result ;
433
438
}
434
439
435
- PooledSocket retval = null ;
436
-
437
440
if ( ! _semaphore . Wait ( _queueTimeout ) )
438
441
{
439
442
message = "Pool is full, timeouting. " + _endPoint ;
@@ -456,20 +459,23 @@ public IPooledSocketResult Acquire()
456
459
return result ;
457
460
}
458
461
462
+
463
+ PooledSocket socket ;
459
464
// do we have free items?
460
- if ( _freeItems . TryPop ( out retval ) )
465
+ if ( TryPopPooledSocket ( out socket ) )
461
466
{
462
467
#region [ get it from the pool ]
463
468
464
469
try
465
470
{
466
- retval . Reset ( ) ;
471
+ socket . Reset ( ) ;
467
472
468
- message = "Socket was reset. " + retval . InstanceId ;
473
+ message = "Socket was reset. " + socket . InstanceId ;
469
474
if ( _isDebugEnabled ) _logger . LogDebug ( message ) ;
470
475
471
476
result . Pass ( message ) ;
472
- result . Value = retval ;
477
+ socket . UpdateLastUsed ( ) ;
478
+ result . Value = socket ;
473
479
return result ;
474
480
}
475
481
catch ( Exception e )
@@ -495,9 +501,9 @@ public IPooledSocketResult Acquire()
495
501
{
496
502
// okay, create the new item
497
503
var startTime = DateTime . Now ;
498
- retval = CreateSocket ( ) ;
504
+ socket = CreateSocket ( ) ;
499
505
_logger . LogInformation ( "MemcachedAcquire-CreateSocket: {0}ms" , ( DateTime . Now - startTime ) . TotalMilliseconds ) ;
500
- result . Value = retval ;
506
+ result . Value = socket ;
501
507
result . Pass ( ) ;
502
508
}
503
509
catch ( Exception e )
@@ -542,7 +548,7 @@ public async Task<IPooledSocketResult> AcquireAsync()
542
548
return result ;
543
549
}
544
550
545
- PooledSocket retval = null ;
551
+ PooledSocket socket = null ;
546
552
547
553
if ( ! await _semaphore . WaitAsync ( _queueTimeout ) )
548
554
{
@@ -566,13 +572,13 @@ public async Task<IPooledSocketResult> AcquireAsync()
566
572
}
567
573
568
574
// do we have free items?
569
- if ( _freeItems . TryPop ( out retval ) )
575
+ if ( TryPopPooledSocket ( out socket ) )
570
576
{
571
577
#region [ get it from the pool ]
572
578
573
579
try
574
580
{
575
- var resetTask = retval . ResetAsync ( ) ;
581
+ var resetTask = socket . ResetAsync ( ) ;
576
582
577
583
if ( await Task . WhenAny ( resetTask , Task . Delay ( _receiveTimeout ) ) == resetTask )
578
584
{
@@ -581,19 +587,20 @@ public async Task<IPooledSocketResult> AcquireAsync()
581
587
else
582
588
{
583
589
_semaphore . Release ( ) ;
584
- retval . IsAlive = false ;
590
+ socket . IsAlive = false ;
585
591
586
- message = "Timeout to reset an acquired socket. InstanceId " + retval . InstanceId ;
592
+ message = "Timeout to reset an acquired socket. InstanceId " + socket . InstanceId ;
587
593
_logger . LogError ( message ) ;
588
594
result . Fail ( message ) ;
589
595
return result ;
590
596
}
591
597
592
- message = "Socket was reset. InstanceId " + retval . InstanceId ;
598
+ message = "Socket was reset. InstanceId " + socket . InstanceId ;
593
599
if ( _isDebugEnabled ) _logger . LogDebug ( message ) ;
594
600
595
601
result . Pass ( message ) ;
596
- result . Value = retval ;
602
+ socket . UpdateLastUsed ( ) ;
603
+ result . Value = socket ;
597
604
return result ;
598
605
}
599
606
catch ( Exception e )
@@ -619,9 +626,9 @@ public async Task<IPooledSocketResult> AcquireAsync()
619
626
{
620
627
// okay, create the new item
621
628
var startTime = DateTime . Now ;
622
- retval = await CreateSocketAsync ( ) ;
629
+ socket = await CreateSocketAsync ( ) ;
623
630
_logger . LogInformation ( "MemcachedAcquire-CreateSocket: {0}ms" , ( DateTime . Now - startTime ) . TotalMilliseconds ) ;
624
- result . Value = retval ;
631
+ result . Value = socket ;
625
632
result . Pass ( ) ;
626
633
}
627
634
catch ( Exception e )
@@ -737,6 +744,34 @@ private void ReleaseSocket(PooledSocket socket)
737
744
}
738
745
}
739
746
747
+ private bool TryPopPooledSocket ( out PooledSocket pooledSocket )
748
+ {
749
+ if ( _freeItems . TryPop ( out var socket ) )
750
+ {
751
+ if ( _connectionIdleTimeout > TimeSpan . Zero &&
752
+ socket . LastUsed < DateTime . UtcNow . Subtract ( _connectionIdleTimeout ) )
753
+ {
754
+ try
755
+ {
756
+ _logger . LogInformation ( "Connection idle timeout {idleTimeout} reached." , _connectionIdleTimeout ) ;
757
+ socket . Destroy ( ) ;
758
+ }
759
+ catch ( Exception ex )
760
+ {
761
+ _logger . LogError ( ex , $ "Failed to destroy { nameof ( PooledSocket ) } ") ;
762
+ }
763
+
764
+ pooledSocket = null ;
765
+ return false ;
766
+ }
767
+
768
+ pooledSocket = socket ;
769
+ return true ;
770
+ }
771
+
772
+ pooledSocket = null ;
773
+ return false ;
774
+ }
740
775
741
776
~ InternalPoolImpl ( )
742
777
{
@@ -758,12 +793,10 @@ public void Dispose()
758
793
_isAlive = false ;
759
794
_isDisposed = true ;
760
795
761
- PooledSocket ps ;
762
-
763
- while ( _freeItems . TryPop ( out ps ) )
796
+ while ( _freeItems . TryPop ( out var socket ) )
764
797
{
765
- try { ps . Destroy ( ) ; }
766
- catch { }
798
+ try { socket . Destroy ( ) ; }
799
+ catch ( Exception ex ) { _logger . LogError ( ex , $ "failed to destroy { nameof ( PooledSocket ) } " ) ; }
767
800
}
768
801
769
802
_ownerNode = null ;
0 commit comments