@@ -29,7 +29,7 @@ public class MemcachedNode : IMemcachedNode
29
29
private bool isDisposed ;
30
30
31
31
private readonly EndPoint _endPoint ;
32
- private readonly ISocketPoolConfiguration config ;
32
+ private readonly ISocketPoolConfiguration _config ;
33
33
private InternalPoolImpl internalPoolImpl ;
34
34
private bool isInitialized = false ;
35
35
private SemaphoreSlim poolInitSemaphore = new SemaphoreSlim ( 1 , 1 ) ;
@@ -42,7 +42,7 @@ public MemcachedNode(
42
42
{
43
43
_endPoint = endpoint ;
44
44
EndPointString = endpoint ? . ToString ( ) . Replace ( "Unspecified/" , string . Empty ) ;
45
- this . config = socketPoolConfig ;
45
+ _config = socketPoolConfig ;
46
46
47
47
if ( socketPoolConfig . ConnectionTimeout . TotalMilliseconds >= Int32 . MaxValue )
48
48
throw new InvalidOperationException ( "ConnectionTimeout must be < Int32.MaxValue" ) ;
@@ -65,7 +65,7 @@ public MemcachedNode(
65
65
66
66
protected INodeFailurePolicy FailurePolicy
67
67
{
68
- get { return this . failurePolicy ?? ( this . failurePolicy = this . config . FailurePolicyFactory . Create ( this ) ) ; }
68
+ get { return this . failurePolicy ?? ( this . failurePolicy = _config . FailurePolicyFactory . Create ( this ) ) ; }
69
69
}
70
70
71
71
/// <summary>
@@ -121,7 +121,7 @@ public bool Ping()
121
121
// it's easier to create a new pool than reinitializing a dead one
122
122
// rewrite-then-dispose to avoid a race condition with Acquire (which does no locking)
123
123
var oldPool = this . internalPoolImpl ;
124
- var newPool = new InternalPoolImpl ( this , this . config , _logger ) ;
124
+ var newPool = new InternalPoolImpl ( this , _config , _logger ) ;
125
125
126
126
Interlocked . Exchange ( ref this . internalPoolImpl , newPool ) ;
127
127
@@ -774,7 +774,7 @@ protected internal virtual PooledSocket CreateSocket()
774
774
{
775
775
try
776
776
{
777
- var ps = new PooledSocket ( _endPoint , this . config . ConnectionTimeout , this . config . ReceiveTimeout , _logger ) ;
777
+ var ps = new PooledSocket ( _endPoint , _config . ConnectionTimeout , _config . ReceiveTimeout , _logger ) ;
778
778
ps . Connect ( ) ;
779
779
return ps ;
780
780
}
@@ -790,7 +790,7 @@ protected internal virtual async Task<PooledSocket> CreateSocketAsync()
790
790
{
791
791
try
792
792
{
793
- var ps = new PooledSocket ( _endPoint , this . config . ConnectionTimeout , this . config . ReceiveTimeout , _logger ) ;
793
+ var ps = new PooledSocket ( _endPoint , _config . ConnectionTimeout , _config . ReceiveTimeout , _logger ) ;
794
794
await ps . ConnectAsync ( ) ;
795
795
return ps ;
796
796
}
@@ -872,11 +872,26 @@ protected virtual async Task<IPooledSocketResult> ExecuteOperationAsync(IOperati
872
872
var b = op . GetBuffer ( ) ;
873
873
874
874
_logger . LogDebug ( "pooledSocket.WriteAsync..." ) ;
875
- await pooledSocket . WriteAsync ( b ) ;
875
+
876
+ var writeSocketTask = pooledSocket . WriteAsync ( b ) ;
877
+ if ( await Task . WhenAny ( writeSocketTask , Task . Delay ( _config . ConnectionTimeout ) ) != writeSocketTask )
878
+ {
879
+ result . Fail ( "Timeout to pooledSocket.WriteAsync" ) ;
880
+ return result ;
881
+ }
882
+ await writeSocketTask ;
876
883
877
884
//if Get, call BinaryResponse
878
885
_logger . LogDebug ( $ "{ op } .ReadResponseAsync...") ;
879
- var readResult = await op . ReadResponseAsync ( pooledSocket ) ;
886
+
887
+ var readResponseTask = op . ReadResponseAsync ( pooledSocket ) ;
888
+ if ( await Task . WhenAny ( readResponseTask , Task . Delay ( _config . ConnectionTimeout ) ) != readResponseTask )
889
+ {
890
+ result . Fail ( $ "Timeout to ReadResponseAsync(pooledSocket) for { op } ") ;
891
+ return result ;
892
+ }
893
+
894
+ var readResult = await readResponseTask ;
880
895
if ( readResult . Success )
881
896
{
882
897
result . Pass ( ) ;
@@ -974,12 +989,12 @@ bool IMemcachedNode.Ping()
974
989
975
990
IOperationResult IMemcachedNode . Execute ( IOperation op )
976
991
{
977
- return this . ExecuteOperation ( op ) ;
992
+ return ExecuteOperation ( op ) ;
978
993
}
979
994
980
995
async Task < IOperationResult > IMemcachedNode . ExecuteAsync ( IOperation op )
981
996
{
982
- return await this . ExecuteOperationAsync ( op ) ;
997
+ return await ExecuteOperationAsync ( op ) ;
983
998
}
984
999
985
1000
async Task < bool > IMemcachedNode . ExecuteAsync ( IOperation op , Action < bool > next )
0 commit comments