Skip to content

Commit fd8fb0f

Browse files
djemanDidier Fracassibgrainger
authored
Implement COM_STMT_SEND_LONG_DATA. Fixes #943 (#1579)
Signed-off-by: Bradley Grainger <[email protected]> Co-authored-by: Didier Fracassi <[email protected]> Co-authored-by: Bradley Grainger <[email protected]>
1 parent 7f853e3 commit fd8fb0f

File tree

10 files changed

+362
-2
lines changed

10 files changed

+362
-2
lines changed

docs/content/troubleshooting/parameter-types.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ In some cases, this may be as simple as calling `.ToString()` or `.ToString(Cult
3838

3939
* .NET primitives: `bool`, `byte`, `char`, `double`, `float`, `int`, `long`, `sbyte`, `short`, `uint`, `ulong`, `ushort`
4040
* Common types: `BigInteger`, `DateOnly`, `DateTime`, `DateTimeOffset`, `decimal`, `enum`, `Guid`, `string`, `TimeOnly`, `TimeSpan`
41-
* BLOB types: `ArraySegment<byte>`, `byte[]`, `Memory<byte>`, `ReadOnlyMemory<byte>`
41+
* BLOB types: `ArraySegment<byte>`, `byte[]`, `Memory<byte>`, `ReadOnlyMemory<byte>`. `MemoryStream`
42+
* NOTE: `System.IO.Stream` and derived types (other than `MemoryStream`) are supported only when `MySqlCommand.Prepare` is called first. The data in the `Stream` will be streamed to the database server as binary data.
4243
* Vector types: `float[]`, `Memory<float>`, `ReadOnlyMemory<float>`
4344
* String types: `Memory<char>`, `ReadOnlyMemory<char>`, `StringBuilder`
4445
* Custom MySQL types: `MySqlDateTime`, `MySqlDecimal`, `MySqlGeometry`

src/MySqlConnector/Core/CommandExecutor.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public static async ValueTask<MySqlDataReader> ExecuteReaderAsync(CommandListPos
3939
}
4040
}
4141

42+
await payloadCreator.SendCommandPrologueAsync(connection, commandListPosition, ioBehavior, cancellationToken).ConfigureAwait(false);
43+
4244
var writer = new ByteBufferWriter();
4345
//// cachedProcedures will be non-null if there is a stored procedure, which is also the only time it will be read
4446
if (!payloadCreator.WriteQueryCommand(ref commandListPosition, cachedProcedures!, writer, false))

src/MySqlConnector/Core/ConcatenatedCommandPayloadCreator.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ internal sealed class ConcatenatedCommandPayloadCreator : ICommandPayloadCreator
88
{
99
public static ICommandPayloadCreator Instance { get; } = new ConcatenatedCommandPayloadCreator();
1010

11+
public ValueTask SendCommandPrologueAsync(MySqlConnection connection, CommandListPosition commandListPosition, IOBehavior ioBehavior, CancellationToken cancellationToken) =>
12+
default;
13+
1114
public bool WriteQueryCommand(ref CommandListPosition commandListPosition, IDictionary<string, CachedProcedure?> cachedProcedures, ByteBufferWriter writer, bool appendSemicolon)
1215
{
1316
if (commandListPosition.CommandIndex == commandListPosition.CommandCount)

src/MySqlConnector/Core/ICommandPayloadCreator.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,16 @@ namespace MySqlConnector.Core;
77
/// </summary>
88
internal interface ICommandPayloadCreator
99
{
10+
/// <summary>
11+
/// Sends any data that is required to be sent to the server before the current command in the command list.
12+
/// </summary>
13+
/// <param name="connection">The <see cref="MySqlConnection"/> to which the data will be written.</param>
14+
/// <param name="commandListPosition">The <see cref="CommandListPosition"/> giving the current command and current prepared statement.</param>
15+
/// <param name="ioBehavior">The IO behavior.</param>
16+
/// <param name="cancellationToken">A cancellation token to cancel the asynchronous operation.</param>
17+
/// <returns>A <see cref="ValueTask"/> representing the asynchronous operation or a completed <see cref="ValueTask"/> if no data needed to be sent.</returns>
18+
ValueTask SendCommandPrologueAsync(MySqlConnection connection, CommandListPosition commandListPosition, IOBehavior ioBehavior, CancellationToken cancellationToken);
19+
1020
/// <summary>
1121
/// Writes the payload for an "execute query" command to <paramref name="writer"/>.
1222
/// </summary>

src/MySqlConnector/Core/SingleCommandPayloadCreator.cs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
using System.Buffers;
2+
using System.Buffers.Binary;
3+
using System.Runtime.InteropServices;
14
using MySqlConnector.Logging;
25
using MySqlConnector.Protocol;
36
using MySqlConnector.Protocol.Serialization;
@@ -12,6 +15,81 @@ internal sealed class SingleCommandPayloadCreator : ICommandPayloadCreator
1215
// with this as the first column name, the result set will be treated as 'out' parameters for the previous command.
1316
public static string OutParameterSentinelColumnName => "\uE001\b\x0B";
1417

18+
public async ValueTask SendCommandPrologueAsync(MySqlConnection connection, CommandListPosition commandListPosition, IOBehavior ioBehavior, CancellationToken cancellationToken)
19+
{
20+
// get the current command and check for prepared statements
21+
var command = commandListPosition.CommandAt(commandListPosition.CommandIndex);
22+
var preparedStatements = commandListPosition.PreparedStatements ?? command.TryGetPreparedStatements();
23+
if (preparedStatements is not null)
24+
{
25+
// get the current prepared statement; WriteQueryCommand will advance this
26+
var preparedStatement = preparedStatements.Statements[commandListPosition.PreparedStatementIndex];
27+
if (preparedStatement.Parameters is { } parameters)
28+
{
29+
byte[]? buffer = null;
30+
try
31+
{
32+
// check each parameter
33+
for (var i = 0; i < parameters.Length; i++)
34+
{
35+
// look up this parameter in the command's parameter collection and check if it is a Stream
36+
// NOTE: full parameter checks will be performed (and throw any necessary exceptions) in WritePreparedStatement
37+
var parameterName = preparedStatement.Statement.NormalizedParameterNames![i];
38+
var parameterIndex = parameterName is not null ? (command.RawParameters?.UnsafeIndexOf(parameterName) ?? -1) : preparedStatement.Statement.ParameterIndexes[i];
39+
if (command.RawParameters is { } rawParameters && parameterIndex >= 0 && parameterIndex < rawParameters.Count && rawParameters[parameterIndex] is { Value: Stream stream and not MemoryStream })
40+
{
41+
// seven-byte COM_STMT_SEND_LONG_DATA header: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_send_long_data.html
42+
const int packetHeaderLength = 7;
43+
44+
// send almost-full packets, but don't send exactly ProtocolUtility.MaxPacketSize bytes in one payload (as that's ambiguous to whether another packet follows).
45+
const int maxDataSize = 16_000_000;
46+
int totalBytesRead;
47+
while (true)
48+
{
49+
buffer ??= ArrayPool<byte>.Shared.Rent(packetHeaderLength + maxDataSize);
50+
buffer[0] = (byte) CommandKind.StatementSendLongData;
51+
BinaryPrimitives.WriteInt32LittleEndian(buffer.AsSpan(1), preparedStatement.StatementId);
52+
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(5), (ushort) i);
53+
54+
// keep reading from the stream until we've filled the buffer to send
55+
#if NET7_0_OR_GREATER
56+
if (ioBehavior == IOBehavior.Synchronous)
57+
totalBytesRead = stream.ReadAtLeast(buffer.AsSpan(packetHeaderLength, maxDataSize), maxDataSize, throwOnEndOfStream: false);
58+
else
59+
totalBytesRead = await stream.ReadAtLeastAsync(buffer.AsMemory(packetHeaderLength, maxDataSize), maxDataSize, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false);
60+
#else
61+
totalBytesRead = 0;
62+
int bytesRead;
63+
do
64+
{
65+
var sizeToRead = maxDataSize - totalBytesRead;
66+
if (ioBehavior == IOBehavior.Synchronous)
67+
bytesRead = stream.Read(buffer, packetHeaderLength + totalBytesRead, sizeToRead);
68+
else
69+
bytesRead = await stream.ReadAsync(buffer, packetHeaderLength + totalBytesRead, sizeToRead, cancellationToken).ConfigureAwait(false);
70+
totalBytesRead += bytesRead;
71+
} while (bytesRead > 0 && totalBytesRead < maxDataSize);
72+
#endif
73+
74+
if (totalBytesRead == 0)
75+
break;
76+
77+
// send StatementSendLongData; MySQL Server will keep appending the sent data to the parameter value
78+
using var payload = new PayloadData(buffer.AsMemory(0, packetHeaderLength + totalBytesRead), isPooled: false);
79+
await connection.Session.SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
80+
}
81+
}
82+
}
83+
}
84+
finally
85+
{
86+
if (buffer is not null)
87+
ArrayPool<byte>.Shared.Return(buffer);
88+
}
89+
}
90+
}
91+
}
92+
1593
public bool WriteQueryCommand(ref CommandListPosition commandListPosition, IDictionary<string, CachedProcedure?> cachedProcedures, ByteBufferWriter writer, bool appendSemicolon)
1694
{
1795
if (commandListPosition.CommandIndex == commandListPosition.CommandCount)
@@ -58,6 +136,7 @@ public bool WriteQueryCommand(ref CommandListPosition commandListPosition, IDict
58136
{
59137
commandListPosition.CommandIndex++;
60138
commandListPosition.PreparedStatementIndex = 0;
139+
commandListPosition.PreparedStatements = null;
61140
}
62141
}
63142
return true;

src/MySqlConnector/MySqlDataReader.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,10 @@ internal async Task<bool> NextResultAsync(IOBehavior ioBehavior, CancellationTok
7474
Command = m_commandListPosition.CommandAt(m_commandListPosition.CommandIndex);
7575
using (Command.CancellableCommand.RegisterCancel(cancellationToken))
7676
{
77+
await m_payloadCreator!.SendCommandPrologueAsync(Command.Connection!, m_commandListPosition, ioBehavior, cancellationToken).ConfigureAwait(false);
78+
7779
var writer = new ByteBufferWriter();
78-
if (!Command.Connection!.Session.IsCancelingQuery && m_payloadCreator!.WriteQueryCommand(ref m_commandListPosition, m_cachedProcedures!, writer, false))
80+
if (!Command.Connection!.Session.IsCancelingQuery && m_payloadCreator.WriteQueryCommand(ref m_commandListPosition, m_cachedProcedures!, writer, false))
7981
{
8082
using var payload = writer.ToPayloadData();
8183
await Command.Connection.Session.SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);

src/MySqlConnector/MySqlParameter.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,10 @@ internal void AppendSqlString(ByteBufferWriter writer, StatementPreparerOptions
331331
Debug.Assert(index == length, "index == length");
332332
writer.Advance(index);
333333
}
334+
else if (Value is Stream)
335+
{
336+
throw new NotSupportedException($"Parameter type {Value.GetType().Name} can only be used after calling MySqlCommand.Prepare.");
337+
}
334338
else if (Value is bool boolValue)
335339
{
336340
writer.Write(boolValue ? "true"u8 : "false"u8);
@@ -728,6 +732,10 @@ private void AppendBinary(ByteBufferWriter writer, object value, StatementPrepar
728732
writer.WriteLengthEncodedInteger(unchecked((ulong) streamBuffer.Count));
729733
writer.Write(streamBuffer);
730734
}
735+
else if (value is Stream)
736+
{
737+
// do nothing; this will be sent via CommandKind.StatementSendLongData
738+
}
731739
else if (value is float floatValue)
732740
{
733741
#if NET5_0_OR_GREATER

src/MySqlConnector/Protocol/CommandKind.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ internal enum CommandKind
99
ChangeUser = 17,
1010
StatementPrepare = 22,
1111
StatementExecute = 23,
12+
StatementSendLongData = 24,
1213
ResetConnection = 31,
1314
}

tests/IntegrationTests/ChunkStream.cs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
namespace IntegrationTests;
2+
3+
internal sealed class ChunkStream : Stream
4+
{
5+
public ChunkStream(byte[] data, int chunkLength)
6+
{
7+
if (data is null)
8+
throw new ArgumentNullException(nameof(data));
9+
if (chunkLength <= 0)
10+
throw new ArgumentOutOfRangeException(nameof(chunkLength));
11+
12+
m_data = data;
13+
m_chunkLength = chunkLength;
14+
m_position = 0;
15+
}
16+
17+
public override bool CanRead => true;
18+
public override bool CanSeek => false;
19+
public override bool CanWrite => false;
20+
public override long Length => m_data.Length;
21+
public override long Position
22+
{
23+
get => m_position;
24+
set => throw new NotSupportedException();
25+
}
26+
27+
public override int Read(byte[] buffer, int offset, int count)
28+
{
29+
if (buffer is null)
30+
throw new ArgumentNullException(nameof(buffer));
31+
if (offset < 0 || offset > buffer.Length)
32+
throw new ArgumentOutOfRangeException(nameof(offset));
33+
if (count < 0 || offset + count > buffer.Length)
34+
throw new ArgumentOutOfRangeException(nameof(count));
35+
36+
return Read(buffer.AsSpan(offset, count));
37+
}
38+
39+
public
40+
#if NETSTANDARD2_1 || NETCOREAPP2_1_OR_GREATER
41+
override
42+
#endif
43+
int Read(Span<byte> buffer)
44+
{
45+
if (m_position >= m_data.Length)
46+
return 0;
47+
48+
// Read at most chunkLength bytes
49+
var bytesToRead = Math.Min(buffer.Length, Math.Min(m_chunkLength, m_data.Length - m_position));
50+
51+
// Copy data from the actual data array
52+
m_data.AsSpan(m_position, bytesToRead).CopyTo(buffer);
53+
54+
m_position += bytesToRead;
55+
return bytesToRead;
56+
}
57+
58+
public override int ReadByte()
59+
{
60+
Span<byte> buffer = stackalloc byte[1];
61+
var bytesRead = Read(buffer);
62+
return bytesRead == 0 ? -1 : buffer[0];
63+
}
64+
65+
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
66+
{
67+
if (buffer is null)
68+
throw new ArgumentNullException(nameof(buffer));
69+
if (offset < 0 || offset > buffer.Length)
70+
throw new ArgumentOutOfRangeException(nameof(offset));
71+
if (count < 0 || offset + count > buffer.Length)
72+
throw new ArgumentOutOfRangeException(nameof(count));
73+
74+
if (cancellationToken.IsCancellationRequested)
75+
return Task.FromCanceled<int>(cancellationToken);
76+
77+
try
78+
{
79+
return Task.FromResult(Read(buffer.AsSpan(offset, count)));
80+
}
81+
catch (Exception ex)
82+
{
83+
return Task.FromException<int>(ex);
84+
}
85+
}
86+
87+
public
88+
#if NETSTANDARD2_1 || NETCOREAPP2_1_OR_GREATER
89+
override
90+
#endif
91+
ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
92+
{
93+
if (cancellationToken.IsCancellationRequested)
94+
return new(Task.FromCanceled<int>(cancellationToken));
95+
96+
try
97+
{
98+
return new(Read(buffer.Span));
99+
}
100+
catch (Exception ex)
101+
{
102+
return new(Task.FromException<int>(ex));
103+
}
104+
}
105+
106+
public override void Write(byte[] buffer, int offset, int count) =>
107+
throw new NotSupportedException();
108+
109+
public override void WriteByte(byte value) =>
110+
throw new NotSupportedException();
111+
112+
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
113+
throw new NotSupportedException();
114+
115+
public override void SetLength(long value) =>
116+
throw new NotSupportedException();
117+
118+
public override long Seek(long offset, SeekOrigin origin) =>
119+
throw new NotSupportedException();
120+
121+
public override void Flush() =>
122+
throw new NotSupportedException();
123+
124+
public override Task FlushAsync(CancellationToken cancellationToken) =>
125+
throw new NotSupportedException();
126+
127+
private readonly byte[] m_data;
128+
private readonly int m_chunkLength;
129+
private int m_position;
130+
}

0 commit comments

Comments
 (0)