Skip to content

Commit 8c7cb58

Browse files
committed
Reduce memory usage when reading payloads that span multiple packets.
1 parent b50d817 commit 8c7cb58

File tree

6 files changed

+52
-17
lines changed

6 files changed

+52
-17
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using System;
2+
3+
namespace MySql.Data.Protocol.Serialization
4+
{
5+
/// <summary>
6+
/// <see cref="ArraySegmentHolder{T}"/> is a class that holds an instance of <see cref="ArraySegment{T}"/>.
7+
/// Its primary difference from <see cref="ArraySegment{T}"/> is that it's a reference type, so mutations
8+
/// to this object are visible to other objects that hold a reference to it.
9+
/// </summary>
10+
internal sealed class ArraySegmentHolder<T>
11+
{
12+
public ArraySegment<T> ArraySegment { get; set; }
13+
14+
public T[] Array => ArraySegment.Array;
15+
public int Offset => ArraySegment.Offset;
16+
public int Count => ArraySegment.Count;
17+
18+
public void Clear()
19+
{
20+
if (ArraySegment.Count > 0)
21+
ArraySegment = new ArraySegment<T>(ArraySegment.Array, 0, 0);
22+
}
23+
}
24+
}

src/MySqlConnector/Protocol/Serialization/CompressedPayloadHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ public IByteHandler ByteHandler
3030
set => throw new NotSupportedException();
3131
}
3232

33-
public ValueTask<ArraySegment<byte>> ReadPayloadAsync(ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior) =>
34-
ProtocolUtility.ReadPayloadAsync(m_bufferedByteReader, new CompressedByteHandler(this, protocolErrorBehavior), () => -1, default(ArraySegment<byte>), protocolErrorBehavior, ioBehavior);
33+
public ValueTask<ArraySegment<byte>> ReadPayloadAsync(ArraySegmentHolder<byte> cache, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior) =>
34+
ProtocolUtility.ReadPayloadAsync(m_bufferedByteReader, new CompressedByteHandler(this, protocolErrorBehavior), () => -1, cache, protocolErrorBehavior, ioBehavior);
3535

3636
public ValueTask<int> WritePayloadAsync(ArraySegment<byte> payload, IOBehavior ioBehavior)
3737
{

src/MySqlConnector/Protocol/Serialization/IPayloadHandler.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@ internal interface IPayloadHandler
1919
/// <summary>
2020
/// Reads the next payload.
2121
/// </summary>
22+
/// <param name="cache">An <see cref="ArraySegmentHolder{Byte}"/> that will cache any buffers allocated during this
23+
/// read. (To disable caching, pass <code>new ArraySegmentHolder&lt;byte&gt;</code> so the cache will be garbage-collected
24+
/// when this method returns.)</param>
2225
/// <param name="protocolErrorBehavior">The <see cref="ProtocolErrorBehavior"/> to use if there is a protocol error.</param>
2326
/// <param name="ioBehavior">The <see cref="IOBehavior"/> to use when reading data.</param>
2427
/// <returns>An <see cref="ArraySegment{Byte}"/> containing the data that was read. This
2528
/// <see cref="ArraySegment{Byte}"/> will be valid to read from until the next time <see cref="ReadPayloadAsync"/> or
2629
/// <see cref="WritePayloadAsync"/> is called.</returns>
27-
ValueTask<ArraySegment<byte>> ReadPayloadAsync(ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior);
30+
ValueTask<ArraySegment<byte>> ReadPayloadAsync(ArraySegmentHolder<byte> cache, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior);
2831

2932
/// <summary>
3033
/// Writes a payload.

src/MySqlConnector/Protocol/Serialization/ProtocolUtility.cs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,19 @@ private static ValueTask<Packet> CreatePacketFromPayload(ArraySegment<byte> payl
5858
protocolErrorBehavior == ProtocolErrorBehavior.Throw ? ValueTaskExtensions.FromException<Packet>(new EndOfStreamException()) :
5959
default(ValueTask<Packet>);
6060

61-
public static ValueTask<ArraySegment<byte>> ReadPayloadAsync(BufferedByteReader bufferedByteReader, IByteHandler byteHandler, Func<int> getNextSequenceNumber, ArraySegment<byte> previousPayloads, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior)
61+
public static ValueTask<ArraySegment<byte>> ReadPayloadAsync(BufferedByteReader bufferedByteReader, IByteHandler byteHandler, Func<int> getNextSequenceNumber, ArraySegmentHolder<byte> cache, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior)
62+
{
63+
cache.Clear();
64+
return DoReadPayloadAsync(bufferedByteReader, byteHandler, getNextSequenceNumber, cache, protocolErrorBehavior, ioBehavior);
65+
}
66+
67+
private static ValueTask<ArraySegment<byte>> DoReadPayloadAsync(BufferedByteReader bufferedByteReader, IByteHandler byteHandler, Func<int> getNextSequenceNumber, ArraySegmentHolder<byte> previousPayloads, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior)
6268
{
6369
var readPacketTask = ReadPacketAsync(bufferedByteReader, byteHandler, getNextSequenceNumber, protocolErrorBehavior, ioBehavior);
6470
while (readPacketTask.IsCompleted)
6571
{
6672
ValueTask<ArraySegment<byte>> result;
67-
if (HasReadPayload(ref previousPayloads, readPacketTask.Result, protocolErrorBehavior, out result))
73+
if (HasReadPayload(previousPayloads, readPacketTask.Result, protocolErrorBehavior, out result))
6874
return result;
6975

7076
readPacketTask = ReadPacketAsync(bufferedByteReader, byteHandler, getNextSequenceNumber, protocolErrorBehavior, ioBehavior);
@@ -73,40 +79,40 @@ public static ValueTask<ArraySegment<byte>> ReadPayloadAsync(BufferedByteReader
7379
return AddContinuation(readPacketTask, bufferedByteReader, byteHandler, getNextSequenceNumber, previousPayloads, protocolErrorBehavior, ioBehavior);
7480

7581
// NOTE: use a local function (with no captures) to defer creation of lambda objects
76-
ValueTask<ArraySegment<byte>> AddContinuation(ValueTask<Packet> readPacketTask_, BufferedByteReader bufferedByteReader_, IByteHandler byteHandler_, Func<int> getNextSequenceNumber_, ArraySegment<byte> previousPayloads_, ProtocolErrorBehavior protocolErrorBehavior_, IOBehavior ioBehavior_)
82+
ValueTask<ArraySegment<byte>> AddContinuation(ValueTask<Packet> readPacketTask_, BufferedByteReader bufferedByteReader_, IByteHandler byteHandler_, Func<int> getNextSequenceNumber_, ArraySegmentHolder<byte> previousPayloads_, ProtocolErrorBehavior protocolErrorBehavior_, IOBehavior ioBehavior_)
7783
{
7884
return readPacketTask_.ContinueWith(packet =>
79-
HasReadPayload(ref previousPayloads_, packet, protocolErrorBehavior_, out var result_) ? result_ :
80-
ReadPayloadAsync(bufferedByteReader_, byteHandler_, getNextSequenceNumber_, previousPayloads_, protocolErrorBehavior_, ioBehavior_));
85+
HasReadPayload(previousPayloads_, packet, protocolErrorBehavior_, out var result_) ? result_ :
86+
DoReadPayloadAsync(bufferedByteReader_, byteHandler_, getNextSequenceNumber_, previousPayloads_, protocolErrorBehavior_, ioBehavior_));
8187
}
8288
}
8389

84-
private static bool HasReadPayload(ref ArraySegment<byte> previousPayloads, Packet packet, ProtocolErrorBehavior protocolErrorBehavior, out ValueTask<ArraySegment<byte>> result)
90+
private static bool HasReadPayload(ArraySegmentHolder<byte> previousPayloads, Packet packet, ProtocolErrorBehavior protocolErrorBehavior, out ValueTask<ArraySegment<byte>> result)
8591
{
8692
if (packet == null && protocolErrorBehavior == ProtocolErrorBehavior.Ignore)
8793
{
8894
result = default(ValueTask<ArraySegment<byte>>);
8995
return true;
9096
}
9197

92-
var previousPayloadsArray = previousPayloads.Array;
93-
if (previousPayloadsArray == null && packet.Contents.Count < MaxPacketSize)
98+
if (previousPayloads.Count == 0 && packet.Contents.Count < MaxPacketSize)
9499
{
95100
result = new ValueTask<ArraySegment<byte>>(packet.Contents);
96101
return true;
97102
}
98103

104+
var previousPayloadsArray = previousPayloads.Array;
99105
if (previousPayloadsArray == null)
100106
previousPayloadsArray = new byte[ProtocolUtility.MaxPacketSize + 1];
101107
else if (previousPayloads.Offset + previousPayloads.Count + packet.Contents.Count > previousPayloadsArray.Length)
102108
Array.Resize(ref previousPayloadsArray, previousPayloadsArray.Length * 2);
103109

104110
Buffer.BlockCopy(packet.Contents.Array, packet.Contents.Offset, previousPayloadsArray, previousPayloads.Offset + previousPayloads.Count, packet.Contents.Count);
105-
previousPayloads = new ArraySegment<byte>(previousPayloadsArray, previousPayloads.Offset, previousPayloads.Count + packet.Contents.Count);
111+
previousPayloads.ArraySegment = new ArraySegment<byte>(previousPayloadsArray, previousPayloads.Offset, previousPayloads.Count + packet.Contents.Count);
106112

107113
if (packet.Contents.Count < ProtocolUtility.MaxPacketSize)
108114
{
109-
result = new ValueTask<ArraySegment<byte>>(previousPayloads);
115+
result = new ValueTask<ArraySegment<byte>>(previousPayloads.ArraySegment);
110116
return true;
111117
}
112118

src/MySqlConnector/Protocol/Serialization/StandardPayloadHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ public IByteHandler ByteHandler
2626
}
2727
}
2828

29-
public ValueTask<ArraySegment<byte>> ReadPayloadAsync(ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior) =>
30-
ProtocolUtility.ReadPayloadAsync(m_bufferedByteReader, m_byteHandler, m_getNextSequenceNumber, default(ArraySegment<byte>), protocolErrorBehavior, ioBehavior);
29+
public ValueTask<ArraySegment<byte>> ReadPayloadAsync(ArraySegmentHolder<byte> cache, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior) =>
30+
ProtocolUtility.ReadPayloadAsync(m_bufferedByteReader, m_byteHandler, m_getNextSequenceNumber, cache, protocolErrorBehavior, ioBehavior);
3131

3232
public ValueTask<int> WritePayloadAsync(ArraySegment<byte> payload, IOBehavior ioBehavior) =>
3333
ProtocolUtility.WritePayloadAsync(m_byteHandler, m_getNextSequenceNumber, payload, ioBehavior);

src/MySqlConnector/Serialization/MySqlSession.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public MySqlSession()
2424
public MySqlSession(ConnectionPool pool, int poolGeneration)
2525
{
2626
m_lock = new object();
27+
m_payloadCache = new ArraySegmentHolder<byte>();
2728
CreatedUtc = DateTime.UtcNow;
2829
Pool = pool;
2930
PoolGeneration = poolGeneration;
@@ -167,7 +168,7 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
167168
{
168169
m_payloadHandler.StartNewConversation();
169170
await m_payloadHandler.WritePayloadAsync(QuitPayload.Create(), ioBehavior).ConfigureAwait(false);
170-
await m_payloadHandler.ReadPayloadAsync(ProtocolErrorBehavior.Ignore, ioBehavior).ConfigureAwait(false);
171+
await m_payloadHandler.ReadPayloadAsync(m_payloadCache, ProtocolErrorBehavior.Ignore, ioBehavior).ConfigureAwait(false);
171172
}
172173
catch (IOException)
173174
{
@@ -321,7 +322,7 @@ public ValueTask<PayloadData> ReceiveReplyAsync(IOBehavior ioBehavior, Cancellat
321322
try
322323
{
323324
VerifyConnected();
324-
task = m_payloadHandler.ReadPayloadAsync(ProtocolErrorBehavior.Throw, ioBehavior);
325+
task = m_payloadHandler.ReadPayloadAsync(m_payloadCache, ProtocolErrorBehavior.Throw, ioBehavior);
325326
}
326327
catch (Exception ex)
327328
{
@@ -690,6 +691,7 @@ private enum State
690691
}
691692

692693
readonly object m_lock;
694+
readonly ArraySegmentHolder<byte> m_payloadCache;
693695
State m_state;
694696
string m_hostname = "";
695697
TcpClient m_tcpClient;

0 commit comments

Comments
 (0)