Skip to content

Commit efd7a8f

Browse files
authored
Merge pull request #69 from cnblogs/read-response-async
Read response async
2 parents 4585857 + 960895e commit efd7a8f

21 files changed

+863
-759
lines changed

Enyim.Caching/Enyim.Caching.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,6 @@
2525
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="2.1.2" />
2626
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.1.1" />
2727
<PackageReference Include="Newtonsoft.Json.Bson" Version="1.0.1" />
28+
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.1" />
2829
</ItemGroup>
2930
</Project>

Enyim.Caching/Memcached/MemcachedNode.cs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,3 @@
1-
using Enyim.Caching.Configuration;
2-
using Enyim.Caching.Memcached.Protocol.Binary;
3-
using Enyim.Caching.Memcached.Results;
4-
using Enyim.Caching.Memcached.Results.Extensions;
5-
using Enyim.Collections;
6-
using Microsoft.Extensions.Logging;
71
using System;
82
using System.Collections.Generic;
93
using System.Diagnostics;
@@ -13,6 +7,12 @@
137
using System.Security;
148
using System.Threading;
159
using System.Threading.Tasks;
10+
using Enyim.Caching.Configuration;
11+
using Enyim.Caching.Memcached.Protocol.Binary;
12+
using Enyim.Caching.Memcached.Results;
13+
using Enyim.Caching.Memcached.Results.Extensions;
14+
using Enyim.Collections;
15+
using Microsoft.Extensions.Logging;
1616

1717
namespace Enyim.Caching.Memcached
1818
{
@@ -591,14 +591,13 @@ protected async virtual Task<IPooledSocketResult> ExecuteOperationAsync(IOperati
591591
{
592592
var pooledSocket = result.Value;
593593

594-
595594
//if Get, call BinaryRequest.CreateBuffer()
596595
var b = op.GetBuffer();
597596

598597
await pooledSocket.WriteSync(b);
599598

600599
//if Get, call BinaryResponse
601-
var readResult = op.ReadResponse(pooledSocket);
600+
var readResult = await op.ReadResponseAsync(pooledSocket);
602601
if (readResult.Success)
603602
{
604603
result.Pass();

Enyim.Caching/Memcached/PooledSocket.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using Microsoft.Extensions.Logging;
21
using System;
32
using System.Collections.Generic;
43
using System.Diagnostics;
@@ -11,6 +10,7 @@
1110
using System.Text;
1211
using System.Threading;
1312
using System.Threading.Tasks;
13+
using Microsoft.Extensions.Logging;
1414

1515
namespace Enyim.Caching.Memcached
1616
{
@@ -208,6 +208,21 @@ public int ReadByte()
208208
}
209209
}
210210

211+
public int ReadByteAsync()
212+
{
213+
this.CheckDisposed();
214+
215+
try
216+
{
217+
return this.inputStream.ReadByte();
218+
}
219+
catch (IOException)
220+
{
221+
this.isAlive = false;
222+
throw;
223+
}
224+
}
225+
211226
public async Task<byte[]> ReadBytesAsync(int count)
212227
{
213228
var buffer = new ArraySegment<byte>(new byte[count], 0, count);

Enyim.Caching/Memcached/Protocol/Binary/BinaryConverter.cs

Lines changed: 28 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -5,110 +5,58 @@ namespace Enyim.Caching.Memcached.Protocol.Binary
55
{
66
public static class BinaryConverter
77
{
8-
public static unsafe ushort DecodeUInt16(byte[] buffer, int offset)
8+
public static ushort DecodeUInt16(Span<byte> buffer, int offset)
99
{
1010
return (ushort)((buffer[offset] << 8) + buffer[offset + 1]);
1111
}
1212

13-
public static unsafe ushort DecodeUInt16(byte* buffer, int offset)
13+
public static int DecodeInt32(Span<byte> buffer, int offset)
1414
{
15-
return (ushort)((buffer[offset] << 8) + buffer[offset + 1]);
16-
}
17-
18-
public static unsafe int DecodeInt32(ArraySegment<byte> segment, int offset)
19-
{
20-
fixed (byte* buffer = segment.Array)
21-
{
22-
byte* ptr = buffer + segment.Offset + offset;
23-
24-
return DecodeInt32(buffer, 0);
25-
}
26-
}
27-
28-
public static unsafe int DecodeInt32(byte* buffer, int offset)
29-
{
30-
buffer += offset;
31-
32-
return (buffer[0] << 24) | (buffer[1] << 16) | (buffer[2] << 8) | buffer[3];
33-
}
34-
35-
public static unsafe int DecodeInt32(byte[] buffer, int offset)
36-
{
37-
return (buffer[offset] << 24) | (buffer[offset + 1] << 16) | (buffer[offset + 2] << 8) | buffer[offset + 3];
38-
}
15+
var slice = buffer.Slice(offset);
3916

40-
public static unsafe ulong DecodeUInt64(byte[] buffer, int offset)
41-
{
42-
fixed (byte* ptr = buffer)
43-
{
44-
return DecodeUInt64(ptr, offset);
45-
}
17+
return (slice[0] << 24) | (slice[1] << 16) | (slice[2] << 8) | slice[3];
4618
}
4719

48-
public static unsafe ulong DecodeUInt64(byte* buffer, int offset)
20+
public static unsafe ulong DecodeUInt64(Span<byte> buffer, int offset)
4921
{
50-
buffer += offset;
22+
var slice = buffer.Slice(offset);
5123

52-
var part1 = (uint)((buffer[0] << 24) | (buffer[1] << 16) | (buffer[2] << 8) | buffer[3]);
53-
var part2 = (uint)((buffer[4] << 24) | (buffer[5] << 16) | (buffer[6] << 8) | buffer[7]);
24+
var part1 = (uint)((slice[0] << 24) | (slice[1] << 16) | (slice[2] << 8) | slice[3]);
25+
var part2 = (uint)((slice[4] << 24) | (slice[5] << 16) | (slice[6] << 8) | slice[7]);
5426

5527
return ((ulong)part1 << 32) | part2;
5628
}
5729

58-
public static unsafe void EncodeUInt16(uint value, byte[] buffer, int offset)
59-
{
60-
fixed (byte* bufferPtr = buffer)
61-
{
62-
EncodeUInt16(value, bufferPtr, offset);
63-
}
64-
}
65-
66-
public static unsafe void EncodeUInt16(uint value, byte* buffer, int offset)
30+
public static unsafe void EncodeUInt16(uint value, Span<byte> buffer, int offset)
6731
{
68-
byte* ptr = buffer + offset;
32+
var slice = buffer.Slice(offset);
6933

70-
ptr[0] = (byte)(value >> 8);
71-
ptr[1] = (byte)(value & 255);
34+
slice[0] = (byte)(value >> 8);
35+
slice[1] = (byte)(value & 255);
7236
}
7337

74-
public static unsafe void EncodeUInt32(uint value, byte[] buffer, int offset)
38+
public static unsafe void EncodeUInt32(uint value, Span<byte> buffer, int offset)
7539
{
76-
fixed (byte* bufferPtr = buffer)
77-
{
78-
EncodeUInt32(value, bufferPtr, offset);
79-
}
80-
}
81-
82-
public static unsafe void EncodeUInt32(uint value, byte* buffer, int offset)
83-
{
84-
byte* ptr = buffer + offset;
40+
var slice = buffer.Slice(offset);
8541

86-
ptr[0] = (byte)(value >> 24);
87-
ptr[1] = (byte)(value >> 16);
88-
ptr[2] = (byte)(value >> 8);
89-
ptr[3] = (byte)(value & 255);
42+
slice[0] = (byte)(value >> 24);
43+
slice[1] = (byte)(value >> 16);
44+
slice[2] = (byte)(value >> 8);
45+
slice[3] = (byte)(value & 255);
9046
}
9147

92-
public static unsafe void EncodeUInt64(ulong value, byte[] buffer, int offset)
48+
public static unsafe void EncodeUInt64(ulong value, Span<byte> buffer, int offset)
9349
{
94-
fixed (byte* bufferPtr = buffer)
95-
{
96-
EncodeUInt64(value, bufferPtr, offset);
97-
}
98-
}
50+
var slice = buffer.Slice(offset);
9951

100-
public static unsafe void EncodeUInt64(ulong value, byte* buffer, int offset)
101-
{
102-
byte* ptr = buffer + offset;
103-
104-
ptr[0] = (byte)(value >> 56);
105-
ptr[1] = (byte)(value >> 48);
106-
ptr[2] = (byte)(value >> 40);
107-
ptr[3] = (byte)(value >> 32);
108-
ptr[4] = (byte)(value >> 24);
109-
ptr[5] = (byte)(value >> 16);
110-
ptr[6] = (byte)(value >> 8);
111-
ptr[7] = (byte)(value & 255);
52+
slice[0] = (byte)(value >> 56);
53+
slice[1] = (byte)(value >> 48);
54+
slice[2] = (byte)(value >> 40);
55+
slice[3] = (byte)(value >> 32);
56+
slice[4] = (byte)(value >> 24);
57+
slice[5] = (byte)(value >> 16);
58+
slice[6] = (byte)(value >> 8);
59+
slice[7] = (byte)(value & 255);
11260
}
11361

11462
public static byte[] EncodeKey(string key)

Enyim.Caching/Memcached/Protocol/Binary/BinaryOperation.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading.Tasks;
34

45
namespace Enyim.Caching.Memcached.Protocol.Binary
56
{
@@ -12,15 +13,10 @@ protected internal override IList<ArraySegment<byte>> GetBuffer()
1213
return this.Build().CreateBuffer();
1314
}
1415

15-
protected internal override System.Threading.Tasks.Task<Results.IOperationResult> ReadResponseAsync(PooledSocket socket)
16-
{
17-
throw new NotImplementedException();
18-
}
19-
2016
protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action<bool> next)
2117
{
2218
throw new System.NotSupportedException();
23-
}
19+
}
2420
}
2521
}
2622

Enyim.Caching/Memcached/Protocol/Binary/BinaryResponse.cs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
using System;
2-
using System.Text;
32
using System.Diagnostics;
3+
using System.Text;
44
using System.Threading.Tasks;
55

66
namespace Enyim.Caching.Memcached.Protocol.Binary
@@ -64,7 +64,7 @@ public unsafe bool Read(PooledSocket socket)
6464
if (dataLength > 0)
6565
{
6666
var data = new byte[dataLength];
67-
socket.Read(data, 0, dataLength);
67+
socket.Read(data, 0, dataLength);
6868

6969
this.Extra = new ArraySegment<byte>(data, 0, extraLength);
7070
this.Data = new ArraySegment<byte>(data, extraLength, data.Length - extraLength);
@@ -207,24 +207,22 @@ private void DoDecodeBody(AsyncIOArgs asyncEvent)
207207
if (this.shouldCallNext) this.next(true);
208208
}
209209

210-
private unsafe void DeserializeHeader(byte[] header, out int dataLength, out int extraLength)
210+
211+
private void DeserializeHeader(Span<byte> header, out int dataLength, out int extraLength)
211212
{
212-
fixed (byte* buffer = header)
213-
{
214-
if (buffer[0] != MAGIC_VALUE)
215-
throw new InvalidOperationException("Expected magic value " + MAGIC_VALUE + ", received: " + buffer[0]);
213+
if (header[0] != MAGIC_VALUE)
214+
throw new InvalidOperationException("Expected magic value " + MAGIC_VALUE + ", received: " + header[0]);
216215

217-
this.DataType = buffer[HEADER_DATATYPE];
218-
this.Opcode = buffer[HEADER_OPCODE];
219-
this.StatusCode = BinaryConverter.DecodeUInt16(buffer, HEADER_STATUS);
216+
this.DataType = header[HEADER_DATATYPE];
217+
this.Opcode = header[HEADER_OPCODE];
218+
this.StatusCode = BinaryConverter.DecodeUInt16(header, HEADER_STATUS);
220219

221-
this.KeyLength = BinaryConverter.DecodeUInt16(buffer, HEADER_KEY);
222-
this.CorrelationId = BinaryConverter.DecodeInt32(buffer, HEADER_OPAQUE);
223-
this.CAS = BinaryConverter.DecodeUInt64(buffer, HEADER_CAS);
220+
this.KeyLength = BinaryConverter.DecodeUInt16(header, HEADER_KEY);
221+
this.CorrelationId = BinaryConverter.DecodeInt32(header, HEADER_OPAQUE);
222+
this.CAS = BinaryConverter.DecodeUInt64(header, HEADER_CAS);
224223

225-
dataLength = BinaryConverter.DecodeInt32(buffer, HEADER_BODY);
226-
extraLength = buffer[HEADER_EXTRA];
227-
}
224+
dataLength = BinaryConverter.DecodeInt32(header, HEADER_BODY);
225+
extraLength = header[HEADER_EXTRA];
228226
}
229227

230228
private void LogExecutionTime(string title, DateTime startTime, int thresholdMs)

Enyim.Caching/Memcached/Protocol/Binary/BinarySingleItemOperation.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ protected internal override IOperationResult ReadResponse(PooledSocket socket)
4444
return result;
4545
}
4646

47-
protected internal override async Task<IOperationResult> ReadResponseAsync(PooledSocket socket)
47+
protected internal override async ValueTask<IOperationResult> ReadResponseAsync(PooledSocket socket)
4848
{
4949
var response = new BinaryResponse();
5050
var retval = await response.ReadAsync(socket);

Enyim.Caching/Memcached/Protocol/Binary/FlushOperation.cs

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,54 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading.Tasks;
34
using Enyim.Caching.Memcached.Results;
45
using Enyim.Caching.Memcached.Results.Extensions;
56

67
namespace Enyim.Caching.Memcached.Protocol.Binary
78
{
8-
public class FlushOperation : BinaryOperation, IFlushOperation
9-
{
10-
public FlushOperation() { }
11-
12-
protected override BinaryRequest Build()
13-
{
14-
var request = new BinaryRequest(OpCode.Flush);
15-
16-
return request;
17-
}
18-
19-
protected internal override IOperationResult ReadResponse(PooledSocket socket)
20-
{
21-
var response = new BinaryResponse();
22-
var retval = response.Read(socket);
23-
24-
this.StatusCode = StatusCode;
25-
var result = new BinaryOperationResult()
26-
{
27-
Success = retval,
28-
StatusCode = this.StatusCode
29-
};
30-
31-
result.PassOrFail(retval, "Failed to read response");
32-
return result;
33-
}
34-
}
9+
public class FlushOperation : BinaryOperation, IFlushOperation
10+
{
11+
public FlushOperation() { }
12+
13+
protected override BinaryRequest Build()
14+
{
15+
var request = new BinaryRequest(OpCode.Flush);
16+
17+
return request;
18+
}
19+
20+
protected internal override IOperationResult ReadResponse(PooledSocket socket)
21+
{
22+
var response = new BinaryResponse();
23+
var retval = response.Read(socket);
24+
25+
this.StatusCode = StatusCode;
26+
var result = new BinaryOperationResult()
27+
{
28+
Success = retval,
29+
StatusCode = this.StatusCode
30+
};
31+
32+
result.PassOrFail(retval, "Failed to read response");
33+
return result;
34+
}
35+
36+
protected internal override async ValueTask<IOperationResult> ReadResponseAsync(PooledSocket socket)
37+
{
38+
var response = new BinaryResponse();
39+
var retval = await response.ReadAsync(socket);
40+
41+
this.StatusCode = StatusCode;
42+
var result = new BinaryOperationResult()
43+
{
44+
Success = retval,
45+
StatusCode = this.StatusCode
46+
};
47+
48+
result.PassOrFail(retval, "Failed to read response");
49+
return result;
50+
}
51+
}
3552
}
3653

3754
#region [ License information ]

0 commit comments

Comments
 (0)