Skip to content

Commit 5c652b6

Browse files
authored
Merge pull request #109 from cnblogs/improve-PooledSocket-WriteAsync
Improve PooledSocket.WriteAsync
2 parents c60e09f + fb7ebde commit 5c652b6

File tree

4 files changed

+232
-188
lines changed

4 files changed

+232
-188
lines changed

Enyim.Caching/Memcached/AsyncSocketHelper.cs

Lines changed: 151 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -11,181 +11,182 @@
1111

1212
namespace Enyim.Caching.Memcached
1313
{
14-
public partial class AsyncPooledSocket
15-
{
16-
/// <summary>
17-
/// Supports exactly one reader and writer, but they can do IO concurrently
18-
/// </summary>
19-
private class AsyncSocketHelper
20-
{
21-
private const int ChunkSize = 65536;
22-
23-
private AsyncPooledSocket socket;
24-
private SlidingBuffer asyncBuffer;
25-
26-
private SocketAsyncEventArgs readEvent;
14+
public partial class AsyncPooledSocket
15+
{
16+
/// <summary>
17+
/// Supports exactly one reader and writer, but they can do IO concurrently
18+
/// </summary>
19+
private class AsyncSocketHelper
20+
{
21+
private const int ChunkSize = 65536;
22+
23+
private readonly PooledSocket socket;
24+
private readonly SlidingBuffer asyncBuffer;
25+
26+
private readonly SocketAsyncEventArgs readEvent;
2727
#if DEBUG_IO
2828
private int doingIO;
2929
#endif
30-
private int remainingRead;
31-
private int expectedToRead;
32-
private AsyncIOArgs pendingArgs;
33-
34-
private int isAborted;
35-
private ManualResetEvent readInProgressEvent;
36-
37-
public AsyncSocketHelper(AsyncPooledSocket socket)
38-
{
39-
this.socket = socket;
40-
this.asyncBuffer = new SlidingBuffer(ChunkSize);
41-
42-
this.readEvent = new SocketAsyncEventArgs();
43-
this.readEvent.Completed += new EventHandler<SocketAsyncEventArgs>(AsyncReadCompleted);
44-
this.readEvent.SetBuffer(new byte[ChunkSize], 0, ChunkSize);
45-
46-
this.readInProgressEvent = new ManualResetEvent(false);
47-
}
48-
49-
/// <summary>
50-
/// returns true if io is pending
51-
/// </summary>
52-
/// <param name="p"></param>
53-
/// <returns></returns>
54-
public bool Read(AsyncIOArgs p)
55-
{
56-
var count = p.Count;
57-
if (count < 1) throw new ArgumentOutOfRangeException("count", "count must be > 0");
30+
private int remainingRead;
31+
private int expectedToRead;
32+
private AsyncIOArgs pendingArgs;
33+
34+
private int isAborted;
35+
private readonly ManualResetEvent readInProgressEvent;
36+
37+
public AsyncSocketHelper(PooledSocket socket)
38+
{
39+
this.socket = socket;
40+
this.asyncBuffer = new SlidingBuffer(ChunkSize);
41+
42+
this.readEvent = new SocketAsyncEventArgs();
43+
this.readEvent.Completed += new EventHandler<SocketAsyncEventArgs>(AsyncReadCompleted);
44+
this.readEvent.SetBuffer(new byte[ChunkSize], 0, ChunkSize);
45+
46+
this.readInProgressEvent = new ManualResetEvent(false);
47+
}
48+
49+
/// <summary>
50+
/// returns true if io is pending
51+
/// </summary>
52+
/// <param name="p"></param>
53+
/// <returns></returns>
54+
public bool Read(AsyncIOArgs p)
55+
{
56+
var count = p.Count;
57+
if (count < 1) throw new ArgumentOutOfRangeException("count", "count must be > 0");
5858
#if DEBUG_IO
5959
if (Interlocked.CompareExchange(ref this.doingIO, 1, 0) != 0)
6060
throw new InvalidOperationException("Receive is already in progress");
6161
#endif
62-
this.expectedToRead = p.Count;
63-
this.pendingArgs = p;
64-
65-
p.Fail = false;
66-
p.Result = null;
67-
68-
if (this.asyncBuffer.Available >= count)
69-
{
70-
PublishResult(false);
71-
72-
return false;
73-
}
74-
else
75-
{
76-
this.remainingRead = count - this.asyncBuffer.Available;
77-
this.isAborted = 0;
78-
79-
this.BeginReceive();
80-
81-
return true;
82-
}
83-
}
84-
85-
public void DiscardBuffer()
86-
{
87-
this.asyncBuffer.UnsafeClear();
88-
}
89-
90-
private void BeginReceive()
91-
{
92-
while (this.remainingRead > 0)
93-
{
94-
this.readInProgressEvent.Reset();
95-
96-
if (this.socket._socket.ReceiveAsync(this.readEvent))
97-
{
98-
// wait until the timeout elapses, then abort this reading process
99-
// EndREceive will be triggered sooner or later but its timeout
100-
// may be higher than our read timeout, so it's not reliable
101-
if (!readInProgressEvent.WaitOne(this.socket._socket.ReceiveTimeout))
102-
this.AbortReadAndTryPublishError(false);
103-
104-
return;
105-
}
106-
107-
this.EndReceive();
108-
}
109-
}
110-
111-
void AsyncReadCompleted(object sender, SocketAsyncEventArgs e)
112-
{
113-
if (this.EndReceive())
114-
this.BeginReceive();
115-
}
116-
117-
private void AbortReadAndTryPublishError(bool markAsDead)
118-
{
119-
if (markAsDead)
120-
this.socket._isAlive = false;
121-
122-
// we've been already aborted, so quit
123-
// both the EndReceive and the wait on the event can abort the read
124-
// but only one should of them should continue the async call chain
125-
if (Interlocked.CompareExchange(ref this.isAborted, 1, 0) != 0)
126-
return;
127-
128-
this.remainingRead = 0;
129-
var p = this.pendingArgs;
62+
this.expectedToRead = p.Count;
63+
this.pendingArgs = p;
64+
65+
p.Fail = false;
66+
p.Result = null;
67+
68+
if (this.asyncBuffer.Available >= count)
69+
{
70+
PublishResult(false);
71+
72+
return false;
73+
}
74+
else
75+
{
76+
this.remainingRead = count - this.asyncBuffer.Available;
77+
this.isAborted = 0;
78+
79+
this.BeginReceive();
80+
81+
return true;
82+
}
83+
}
84+
85+
public void DiscardBuffer()
86+
{
87+
this.asyncBuffer.UnsafeClear();
88+
}
89+
90+
private void BeginReceive()
91+
{
92+
throw new NotImplementedException();
93+
//while (this.remainingRead > 0)
94+
//{
95+
// this.readInProgressEvent.Reset();
96+
97+
// if (this.socket.ReceiveAsync(this.readEvent))
98+
// {
99+
// // wait until the timeout elapses, then abort this reading process
100+
// // EndREceive will be triggered sooner or later but its timeout
101+
// // may be higher than our read timeout, so it's not reliable
102+
// if (!readInProgressEvent.WaitOne(this.socket._socket.ReceiveTimeout))
103+
// this.AbortReadAndTryPublishError(false);
104+
105+
// return;
106+
// }
107+
108+
// this.EndReceive();
109+
//}
110+
}
111+
112+
void AsyncReadCompleted(object sender, SocketAsyncEventArgs e)
113+
{
114+
if (this.EndReceive())
115+
this.BeginReceive();
116+
}
117+
118+
private void AbortReadAndTryPublishError(bool markAsDead)
119+
{
120+
if (markAsDead)
121+
this.socket.IsAlive = false;
122+
123+
// we've been already aborted, so quit
124+
// both the EndReceive and the wait on the event can abort the read
125+
// but only one should of them should continue the async call chain
126+
if (Interlocked.CompareExchange(ref this.isAborted, 1, 0) != 0)
127+
return;
128+
129+
this.remainingRead = 0;
130+
var p = this.pendingArgs;
130131
#if DEBUG_IO
131132
Thread.MemoryBarrier();
132133

133134
this.doingIO = 0;
134135
#endif
135136

136-
p.Fail = true;
137-
p.Result = null;
137+
p.Fail = true;
138+
p.Result = null;
138139

139-
this.pendingArgs.Next(p);
140-
}
140+
this.pendingArgs.Next(p);
141+
}
141142

142-
/// <summary>
143-
/// returns true when io is pending
144-
/// </summary>
145-
/// <returns></returns>
146-
private bool EndReceive()
147-
{
148-
this.readInProgressEvent.Set();
143+
/// <summary>
144+
/// returns true when io is pending
145+
/// </summary>
146+
/// <returns></returns>
147+
private bool EndReceive()
148+
{
149+
this.readInProgressEvent.Set();
149150

150-
var read = this.readEvent.BytesTransferred;
151-
if (this.readEvent.SocketError != SocketError.Success
152-
|| read == 0)
153-
{
154-
this.AbortReadAndTryPublishError(true);//new IOException("Remote end has been closed"));
151+
var read = this.readEvent.BytesTransferred;
152+
if (this.readEvent.SocketError != SocketError.Success
153+
|| read == 0)
154+
{
155+
this.AbortReadAndTryPublishError(true);//new IOException("Remote end has been closed"));
155156

156-
return false;
157-
}
157+
return false;
158+
}
158159

159-
this.remainingRead -= read;
160-
this.asyncBuffer.Append(this.readEvent.Buffer, 0, read);
160+
this.remainingRead -= read;
161+
this.asyncBuffer.Append(this.readEvent.Buffer, 0, read);
161162

162-
if (this.remainingRead <= 0)
163-
{
164-
this.PublishResult(true);
163+
if (this.remainingRead <= 0)
164+
{
165+
this.PublishResult(true);
165166

166-
return false;
167-
}
167+
return false;
168+
}
168169

169-
return true;
170-
}
170+
return true;
171+
}
171172

172-
private void PublishResult(bool isAsync)
173-
{
174-
var retval = this.pendingArgs;
173+
private void PublishResult(bool isAsync)
174+
{
175+
var retval = this.pendingArgs;
175176

176-
var data = new byte[this.expectedToRead];
177-
this.asyncBuffer.Read(data, 0, retval.Count);
178-
pendingArgs.Result = data;
177+
var data = new byte[this.expectedToRead];
178+
this.asyncBuffer.Read(data, 0, retval.Count);
179+
pendingArgs.Result = data;
179180
#if DEBUG_IO
180181
Thread.MemoryBarrier();
181182
this.doingIO = 0;
182183
#endif
183184

184-
if (isAsync)
185-
pendingArgs.Next(pendingArgs);
186-
}
187-
}
188-
}
185+
if (isAsync)
186+
pendingArgs.Next(pendingArgs);
187+
}
188+
}
189+
}
189190

190191
public partial class PooledSocket
191192
{
@@ -196,10 +197,10 @@ private class AsyncSocketHelper
196197
{
197198
private const int ChunkSize = 65536;
198199

199-
private PooledSocket socket;
200-
private SlidingBuffer asyncBuffer;
200+
private readonly PooledSocket socket;
201+
private readonly SlidingBuffer asyncBuffer;
201202

202-
private SocketAsyncEventArgs readEvent;
203+
private readonly SocketAsyncEventArgs readEvent;
203204
#if DEBUG_IO
204205
private int doingIO;
205206
#endif

Enyim.Caching/Memcached/MemcachedNode.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using System.Diagnostics;
1010
using System.IO;
1111
using System.Net;
12+
using System.Net.Sockets;
1213
using System.Runtime.Serialization;
1314
using System.Security;
1415
using System.Threading;
@@ -184,6 +185,7 @@ public async Task<IPooledSocketResult> AcquireAsync()
184185
poolInitSemaphore.Release();
185186
}
186187
}
188+
187189
try
188190
{
189191
return await this.internalPoolImpl.AcquireAsync();
@@ -442,7 +444,6 @@ public IPooledSocketResult Acquire()
442444
message = "Could not get a socket from the pool, Creating a new item. " + _endPoint;
443445
if (_isDebugEnabled) _logger.LogDebug(message);
444446

445-
446447
try
447448
{
448449
// okay, create the new item
@@ -805,9 +806,11 @@ protected virtual async Task<IPooledSocketResult> ExecuteOperationAsync(IOperati
805806
//if Get, call BinaryRequest.CreateBuffer()
806807
var b = op.GetBuffer();
807808

808-
await pooledSocket.WriteSync(b);
809+
_logger.LogDebug("pooledSocket.WriteAsync...");
810+
await pooledSocket.WriteAsync(b);
809811

810812
//if Get, call BinaryResponse
813+
_logger.LogDebug($"{op}.ReadResponseAsync...");
811814
var readResult = await op.ReadResponseAsync(pooledSocket);
812815
if (readResult.Success)
813816
{
@@ -823,7 +826,14 @@ protected virtual async Task<IPooledSocketResult> ExecuteOperationAsync(IOperati
823826
{
824827
_logger.LogError(nameof(MemcachedNode), e);
825828

826-
result.Fail("Exception reading response", e);
829+
result.Fail("IOException reading response", e);
830+
return result;
831+
}
832+
catch (SocketException e)
833+
{
834+
_logger.LogError(nameof(MemcachedNode), e);
835+
836+
result.Fail("SocketException reading response", e);
827837
return result;
828838
}
829839
finally

0 commit comments

Comments
 (0)