Skip to content

Commit 801d977

Browse files
committed
Rewrite ReadPacketAsync to reduce allocations.
This reduces allocations in the "ManyRowsNewSync" scenario by 44%.
1 parent 0c0474d commit 801d977

File tree

1 file changed

+45
-36
lines changed

1 file changed

+45
-36
lines changed

src/MySqlConnector/Protocol/Serialization/ProtocolUtility.cs

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,44 +11,53 @@ internal static class ProtocolUtility
1111
{
1212
public static ValueTask<Packet> ReadPacketAsync(BufferedByteReader bufferedByteReader, IByteHandler byteHandler, Func<int?> getNextSequenceNumber, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior)
1313
{
14-
return bufferedByteReader.ReadBytesAsync(byteHandler, 4, ioBehavior)
15-
.ContinueWith(headerBytes =>
16-
{
17-
if (headerBytes.Count < 4)
18-
{
19-
return protocolErrorBehavior == ProtocolErrorBehavior.Throw ?
20-
ValueTaskExtensions.FromException<Packet>(new EndOfStreamException()) :
21-
default(ValueTask<Packet>);
22-
}
23-
24-
var payloadLength = (int) SerializationUtility.ReadUInt32(headerBytes.Array, headerBytes.Offset, 3);
25-
int packetSequenceNumber = headerBytes.Array[headerBytes.Offset + 3];
26-
27-
var expectedSequenceNumber = getNextSequenceNumber() % 256;
28-
if (expectedSequenceNumber.HasValue && packetSequenceNumber != expectedSequenceNumber.Value)
29-
{
30-
if (protocolErrorBehavior == ProtocolErrorBehavior.Ignore)
31-
return default(ValueTask<Packet>);
32-
33-
var exception = MySqlProtocolException.CreateForPacketOutOfOrder(expectedSequenceNumber.Value, packetSequenceNumber);
34-
return ValueTaskExtensions.FromException<Packet>(exception);
35-
}
36-
37-
return bufferedByteReader.ReadBytesAsync(byteHandler, payloadLength, ioBehavior)
38-
.ContinueWith(payloadBytes =>
39-
{
40-
if (payloadBytes.Count < payloadLength)
41-
{
42-
return protocolErrorBehavior == ProtocolErrorBehavior.Throw ?
43-
ValueTaskExtensions.FromException<Packet>(new EndOfStreamException()) :
44-
default(ValueTask<Packet>);
45-
}
46-
47-
return new ValueTask<Packet>(new Packet(packetSequenceNumber, payloadBytes));
48-
});
49-
});
14+
var headerBytesTask = bufferedByteReader.ReadBytesAsync(byteHandler, 4, ioBehavior);
15+
if (headerBytesTask.IsCompleted)
16+
return ReadPacketAfterHeader(headerBytesTask.Result, bufferedByteReader, byteHandler, getNextSequenceNumber, protocolErrorBehavior, ioBehavior);
17+
return AddContinuation(headerBytesTask, bufferedByteReader, byteHandler, getNextSequenceNumber, protocolErrorBehavior, ioBehavior);
18+
19+
// NOTE: use a local function (with no captures) to defer creation of lambda objects
20+
ValueTask<Packet> AddContinuation(ValueTask<ArraySegment<byte>> headerBytes_, BufferedByteReader bufferedByteReader_, IByteHandler byteHandler_, Func<int?> getNextSequenceNumber_, ProtocolErrorBehavior protocolErrorBehavior_, IOBehavior ioBehavior_) =>
21+
headerBytes_.ContinueWith(x => ReadPacketAfterHeader(x, bufferedByteReader_, byteHandler_, getNextSequenceNumber_, protocolErrorBehavior_, ioBehavior_));
22+
}
23+
24+
private static ValueTask<Packet> ReadPacketAfterHeader(ArraySegment<byte> headerBytes, BufferedByteReader bufferedByteReader, IByteHandler byteHandler, Func<int?> getNextSequenceNumber, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior)
25+
{
26+
if (headerBytes.Count < 4)
27+
{
28+
return protocolErrorBehavior == ProtocolErrorBehavior.Throw ?
29+
ValueTaskExtensions.FromException<Packet>(new EndOfStreamException()) :
30+
default(ValueTask<Packet>);
31+
}
32+
33+
var payloadLength = (int) SerializationUtility.ReadUInt32(headerBytes.Array, headerBytes.Offset, 3);
34+
int packetSequenceNumber = headerBytes.Array[headerBytes.Offset + 3];
35+
36+
var expectedSequenceNumber = getNextSequenceNumber() % 256;
37+
if (expectedSequenceNumber.HasValue && packetSequenceNumber != expectedSequenceNumber.Value)
38+
{
39+
if (protocolErrorBehavior == ProtocolErrorBehavior.Ignore)
40+
return default(ValueTask<Packet>);
41+
42+
var exception = MySqlProtocolException.CreateForPacketOutOfOrder(expectedSequenceNumber.Value, packetSequenceNumber);
43+
return ValueTaskExtensions.FromException<Packet>(exception);
44+
}
45+
46+
var payloadBytesTask = bufferedByteReader.ReadBytesAsync(byteHandler, payloadLength, ioBehavior);
47+
if (payloadBytesTask.IsCompleted)
48+
return CreatePacketFromPayload(payloadBytesTask.Result, payloadLength, packetSequenceNumber, protocolErrorBehavior);
49+
return AddContinuation(payloadBytesTask, payloadLength, packetSequenceNumber, protocolErrorBehavior);
50+
51+
// NOTE: use a local function (with no captures) to defer creation of lambda objects
52+
ValueTask<Packet> AddContinuation(ValueTask<ArraySegment<byte>> payloadBytesTask_, int payloadLength_, int packetSequenceNumber_, ProtocolErrorBehavior protocolErrorBehavior_)
53+
=> payloadBytesTask_.ContinueWith(x => CreatePacketFromPayload(x, payloadLength_, packetSequenceNumber_, protocolErrorBehavior_));
5054
}
5155

56+
private static ValueTask<Packet> CreatePacketFromPayload(ArraySegment<byte> payloadBytes, int payloadLength, int packetSequenceNumber, ProtocolErrorBehavior protocolErrorBehavior) =>
57+
payloadBytes.Count >= payloadLength ? new ValueTask<Packet>(new Packet(packetSequenceNumber, payloadBytes)) :
58+
protocolErrorBehavior == ProtocolErrorBehavior.Throw ? ValueTaskExtensions.FromException<Packet>(new EndOfStreamException()) :
59+
default(ValueTask<Packet>);
60+
5261
public static ValueTask<ArraySegment<byte>> ReadPayloadAsync(BufferedByteReader bufferedByteReader, IByteHandler byteHandler, Func<int?> getNextSequenceNumber, ArraySegment<byte> previousPayloads, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior)
5362
{
5463
var readPacketTask = ReadPacketAsync(bufferedByteReader, byteHandler, getNextSequenceNumber, protocolErrorBehavior, ioBehavior);

0 commit comments

Comments
 (0)