Skip to content

Commit d9e2fa1

Browse files
committed
Use IValueTaskSource for .NET Core 3.0.
1 parent be3005a commit d9e2fa1

File tree

3 files changed

+78
-13
lines changed

3 files changed

+78
-13
lines changed

src/MySqlConnector/Protocol/Serialization/SocketByteHandler.cs

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,47 @@
1+
#if NETSTANDARD2_1 || NETCOREAPP3_0
2+
#define VALUETASKSOURCE
3+
#endif
4+
15
using System;
26
using System.Net.Sockets;
37
using System.Threading.Tasks;
8+
#if VALUETASKSOURCE
9+
using System.Threading.Tasks.Sources;
10+
#endif
411
using MySql.Data.MySqlClient;
512
using MySqlConnector.Utilities;
613

714
namespace MySqlConnector.Protocol.Serialization
815
{
916
internal sealed class SocketByteHandler : IByteHandler
17+
#if VALUETASKSOURCE
18+
, IValueTaskSource<int>
19+
#endif
1020
{
1121
public SocketByteHandler(Socket socket)
1222
{
1323
m_socket = socket;
14-
var socketEventArgs = new SocketAsyncEventArgs();
15-
m_socketAwaitable = new SocketAwaitable(socketEventArgs);
24+
#if VALUETASKSOURCE
25+
m_valueTaskSource = new ManualResetValueTaskSourceCore<int> { RunContinuationsAsynchronously = true };
26+
m_socketEventArgs = new SocketAsyncEventArgs();
27+
m_socketEventArgs.Completed += (s, e) => PropagateSocketAsyncEventArgsStatus();
28+
#else
29+
m_socketAwaitable = new SocketAwaitable(new SocketAsyncEventArgs());
30+
#endif
1631
m_closeSocket = socket.Dispose;
1732
RemainingTimeout = Constants.InfiniteTimeout;
1833
}
1934

35+
#if VALUETASKSOURCE
36+
public void Dispose() => m_socketEventArgs.Dispose();
37+
#else
2038
public void Dispose() => m_socketAwaitable.EventArgs.Dispose();
39+
#endif
2140

2241
public int RemainingTimeout { get; set; }
2342

24-
public ValueTask<int> ReadBytesAsync(ArraySegment<byte> buffer, IOBehavior ioBehavior)
25-
{
26-
return ioBehavior == IOBehavior.Asynchronous ?
27-
new ValueTask<int>(DoReadBytesAsync(buffer)) : DoReadBytesSync(buffer);
28-
}
43+
public ValueTask<int> ReadBytesAsync(ArraySegment<byte> buffer, IOBehavior ioBehavior) =>
44+
ioBehavior == IOBehavior.Asynchronous ? DoReadBytesAsync(buffer) : DoReadBytesSync(buffer);
2945

3046
private ValueTask<int> DoReadBytesSync(ArraySegment<byte> buffer)
3147
{
@@ -53,18 +69,16 @@ private ValueTask<int> DoReadBytesSync(ArraySegment<byte> buffer)
5369
}
5470
}
5571

56-
private async Task<int> DoReadBytesAsync(ArraySegment<byte> buffer)
72+
private async ValueTask<int> DoReadBytesAsync(ArraySegment<byte> buffer)
5773
{
5874
var startTime = RemainingTimeout == Constants.InfiniteTimeout ? 0 : Environment.TickCount;
5975
var timerId = RemainingTimeout == Constants.InfiniteTimeout ? 0 :
6076
RemainingTimeout <= 0 ? throw MySqlException.CreateForTimeout() :
6177
TimerQueue.Instance.Add(RemainingTimeout, m_closeSocket);
62-
m_socketAwaitable.EventArgs.SetBuffer(buffer.Array, buffer.Offset, buffer.Count);
6378
int bytesRead;
6479
try
6580
{
66-
await m_socket.ReceiveAsync(m_socketAwaitable);
67-
bytesRead = m_socketAwaitable.EventArgs.BytesTransferred;
81+
bytesRead = await ReadBytesFromSocketAsync(buffer).ConfigureAwait(false);
6882
}
6983
catch (SocketException ex)
7084
{
@@ -88,7 +102,7 @@ private async Task<int> DoReadBytesAsync(ArraySegment<byte> buffer)
88102
public ValueTask<int> WriteBytesAsync(ArraySegment<byte> data, IOBehavior ioBehavior)
89103
{
90104
if (ioBehavior == IOBehavior.Asynchronous)
91-
return new ValueTask<int>(DoWriteBytesAsync(data));
105+
return WriteBytesToSocketAsync(data);
92106

93107
try
94108
{
@@ -101,15 +115,62 @@ public ValueTask<int> WriteBytesAsync(ArraySegment<byte> data, IOBehavior ioBeha
101115
}
102116
}
103117

104-
private async Task<int> DoWriteBytesAsync(ArraySegment<byte> data)
118+
#if VALUETASKSOURCE
119+
private ValueTask<int> ReadBytesFromSocketAsync(ArraySegment<byte> buffer)
120+
{
121+
m_socketEventArgs.SetBuffer(buffer.Array, buffer.Offset, buffer.Count);
122+
m_valueTaskSource.Reset();
123+
if (!m_socket.ReceiveAsync(m_socketEventArgs))
124+
PropagateSocketAsyncEventArgsStatus();
125+
return new ValueTask<int>(this, m_valueTaskSource.Version);
126+
}
127+
128+
private ValueTask<int> WriteBytesToSocketAsync(ArraySegment<byte> data)
129+
{
130+
m_socketEventArgs.SetBuffer(data.Array, data.Offset, data.Count);
131+
m_valueTaskSource.Reset();
132+
if (!m_socket.SendAsync(m_socketEventArgs))
133+
PropagateSocketAsyncEventArgsStatus();
134+
return new ValueTask<int>(this, m_valueTaskSource.Version);
135+
}
136+
#else
137+
private async ValueTask<int> ReadBytesFromSocketAsync(ArraySegment<byte> buffer)
138+
{
139+
m_socketAwaitable.EventArgs.SetBuffer(buffer.Array, buffer.Offset, buffer.Count);
140+
await m_socket.ReceiveAsync(m_socketAwaitable);
141+
return m_socketAwaitable.EventArgs.BytesTransferred;
142+
}
143+
144+
private async ValueTask<int> WriteBytesToSocketAsync(ArraySegment<byte> data)
105145
{
106146
m_socketAwaitable.EventArgs.SetBuffer(data.Array, data.Offset, data.Count);
107147
await m_socket.SendAsync(m_socketAwaitable);
108148
return 0;
109149
}
150+
#endif
151+
152+
#if VALUETASKSOURCE
153+
int IValueTaskSource<int>.GetResult(short token) => m_valueTaskSource.GetResult(token);
154+
ValueTaskSourceStatus IValueTaskSource<int>.GetStatus(short token) => m_valueTaskSource.GetStatus(token);
155+
void IValueTaskSource<int>.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) =>
156+
m_valueTaskSource.OnCompleted(continuation, state, token, flags);
157+
158+
private void PropagateSocketAsyncEventArgsStatus()
159+
{
160+
if (m_socketEventArgs.SocketError != SocketError.Success)
161+
m_valueTaskSource.SetException(new SocketException((int) m_socketEventArgs.SocketError));
162+
else
163+
m_valueTaskSource.SetResult(m_socketEventArgs.BytesTransferred);
164+
}
165+
#endif
110166

111167
readonly Socket m_socket;
168+
#if VALUETASKSOURCE
169+
ManualResetValueTaskSourceCore<int> m_valueTaskSource; // mutable struct; do not make this readonly
170+
readonly SocketAsyncEventArgs m_socketEventArgs;
171+
#else
112172
readonly SocketAwaitable m_socketAwaitable;
173+
#endif
113174
readonly Action m_closeSocket;
114175
}
115176
}

src/MySqlConnector/Utilities/SocketAwaitable.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#if !NETSTANDARD2_1 && !NETCOREAPP3_0
12
using System;
23
using System.Net.Sockets;
34
using System.Runtime.CompilerServices;
@@ -44,3 +45,4 @@ internal void Reset()
4445
Action? m_continuation;
4546
}
4647
}
48+
#endif

src/MySqlConnector/Utilities/SocketExtensions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ namespace MySqlConnector.Utilities
55
{
66
internal static class SocketExtensions
77
{
8+
#if !NETSTANDARD2_1 && !NETCOREAPP3_0
89
public static SocketAwaitable ReceiveAsync(this Socket socket, SocketAwaitable awaitable)
910
{
1011
awaitable.Reset();
@@ -20,6 +21,7 @@ public static SocketAwaitable SendAsync(this Socket socket, SocketAwaitable awai
2021
awaitable.WasCompleted = true;
2122
return awaitable;
2223
}
24+
#endif
2325

2426
public static void SetKeepAlive(this Socket socket, uint keepAliveTimeSeconds)
2527
{

0 commit comments

Comments
 (0)