Skip to content

Implement COM_STMT_SEND_LONG_DATA #1579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/content/troubleshooting/parameter-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ In some cases, this may be as simple as calling `.ToString()` or `.ToString(Cult

* .NET primitives: `bool`, `byte`, `char`, `double`, `float`, `int`, `long`, `sbyte`, `short`, `uint`, `ulong`, `ushort`
* Common types: `BigInteger`, `DateOnly`, `DateTime`, `DateTimeOffset`, `decimal`, `enum`, `Guid`, `string`, `TimeOnly`, `TimeSpan`
* BLOB types: `ArraySegment<byte>`, `byte[]`, `Memory<byte>`, `ReadOnlyMemory<byte>`
* BLOB types: `ArraySegment<byte>`, `byte[]`, `Memory<byte>`, `ReadOnlyMemory<byte>`. `MemoryStream`
* NOTE: `System.IO.Stream` and derived types (other than `MemoryStream`) are supported only when `MySqlCommand.Prepare` is called first. The data in the `Stream` will be streamed to the database server as binary data.
* Vector types: `float[]`, `Memory<float>`, `ReadOnlyMemory<float>`
* String types: `Memory<char>`, `ReadOnlyMemory<char>`, `StringBuilder`
* Custom MySQL types: `MySqlDateTime`, `MySqlDecimal`, `MySqlGeometry`
2 changes: 2 additions & 0 deletions src/MySqlConnector/Core/CommandExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public static async ValueTask<MySqlDataReader> ExecuteReaderAsync(CommandListPos
}
}

await payloadCreator.SendCommandPrologueAsync(connection, commandListPosition, ioBehavior, cancellationToken).ConfigureAwait(false);

var writer = new ByteBufferWriter();
//// cachedProcedures will be non-null if there is a stored procedure, which is also the only time it will be read
if (!payloadCreator.WriteQueryCommand(ref commandListPosition, cachedProcedures!, writer, false))
Expand Down
3 changes: 3 additions & 0 deletions src/MySqlConnector/Core/ConcatenatedCommandPayloadCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ internal sealed class ConcatenatedCommandPayloadCreator : ICommandPayloadCreator
{
public static ICommandPayloadCreator Instance { get; } = new ConcatenatedCommandPayloadCreator();

public ValueTask SendCommandPrologueAsync(MySqlConnection connection, CommandListPosition commandListPosition, IOBehavior ioBehavior, CancellationToken cancellationToken) =>
default;

public bool WriteQueryCommand(ref CommandListPosition commandListPosition, IDictionary<string, CachedProcedure?> cachedProcedures, ByteBufferWriter writer, bool appendSemicolon)
{
if (commandListPosition.CommandIndex == commandListPosition.CommandCount)
Expand Down
10 changes: 10 additions & 0 deletions src/MySqlConnector/Core/ICommandPayloadCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ namespace MySqlConnector.Core;
/// </summary>
internal interface ICommandPayloadCreator
{
/// <summary>
/// Sends any data that is required to be sent to the server before the current command in the command list.
/// </summary>
/// <param name="connection">The <see cref="MySqlConnection"/> to which the data will be written.</param>
/// <param name="commandListPosition">The <see cref="CommandListPosition"/> giving the current command and current prepared statement.</param>
/// <param name="ioBehavior">The IO behavior.</param>
/// <param name="cancellationToken">A cancellation token to cancel the asynchronous operation.</param>
/// <returns>A <see cref="ValueTask"/> representing the asynchronous operation or a completed <see cref="ValueTask"/> if no data needed to be sent.</returns>
ValueTask SendCommandPrologueAsync(MySqlConnection connection, CommandListPosition commandListPosition, IOBehavior ioBehavior, CancellationToken cancellationToken);

/// <summary>
/// Writes the payload for an "execute query" command to <paramref name="writer"/>.
/// </summary>
Expand Down
79 changes: 79 additions & 0 deletions src/MySqlConnector/Core/SingleCommandPayloadCreator.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
using System.Buffers;
using System.Buffers.Binary;
using System.Runtime.InteropServices;
using MySqlConnector.Logging;
using MySqlConnector.Protocol;
using MySqlConnector.Protocol.Serialization;
Expand All @@ -12,6 +15,81 @@ internal sealed class SingleCommandPayloadCreator : ICommandPayloadCreator
// with this as the first column name, the result set will be treated as 'out' parameters for the previous command.
public static string OutParameterSentinelColumnName => "\uE001\b\x0B";

public async ValueTask SendCommandPrologueAsync(MySqlConnection connection, CommandListPosition commandListPosition, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
// get the current command and check for prepared statements
var command = commandListPosition.CommandAt(commandListPosition.CommandIndex);
var preparedStatements = commandListPosition.PreparedStatements ?? command.TryGetPreparedStatements();
if (preparedStatements is not null)
{
// get the current prepared statement; WriteQueryCommand will advance this
var preparedStatement = preparedStatements.Statements[commandListPosition.PreparedStatementIndex];
if (preparedStatement.Parameters is { } parameters)
{
byte[]? buffer = null;
try
{
// check each parameter
for (var i = 0; i < parameters.Length; i++)
{
// look up this parameter in the command's parameter collection and check if it is a Stream
// NOTE: full parameter checks will be performed (and throw any necessary exceptions) in WritePreparedStatement
var parameterName = preparedStatement.Statement.NormalizedParameterNames![i];
var parameterIndex = parameterName is not null ? (command.RawParameters?.UnsafeIndexOf(parameterName) ?? -1) : preparedStatement.Statement.ParameterIndexes[i];
if (command.RawParameters is { } rawParameters && parameterIndex >= 0 && parameterIndex < rawParameters.Count && rawParameters[parameterIndex] is { Value: Stream stream and not MemoryStream })
{
// seven-byte COM_STMT_SEND_LONG_DATA header: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_send_long_data.html
const int packetHeaderLength = 7;

// send almost-full packets, but don't send exactly ProtocolUtility.MaxPacketSize bytes in one payload (as that's ambiguous to whether another packet follows).
const int maxDataSize = 16_000_000;
int totalBytesRead;
while (true)
{
buffer ??= ArrayPool<byte>.Shared.Rent(packetHeaderLength + maxDataSize);
buffer[0] = (byte) CommandKind.StatementSendLongData;
BinaryPrimitives.WriteInt32LittleEndian(buffer.AsSpan(1), preparedStatement.StatementId);
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(5), (ushort) i);

// keep reading from the stream until we've filled the buffer to send
#if NET7_0_OR_GREATER
if (ioBehavior == IOBehavior.Synchronous)
totalBytesRead = stream.ReadAtLeast(buffer.AsSpan(packetHeaderLength, maxDataSize), maxDataSize, throwOnEndOfStream: false);
else
totalBytesRead = await stream.ReadAtLeastAsync(buffer.AsMemory(packetHeaderLength, maxDataSize), maxDataSize, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false);
#else
totalBytesRead = 0;
int bytesRead;
do
{
var sizeToRead = maxDataSize - totalBytesRead;
if (ioBehavior == IOBehavior.Synchronous)
bytesRead = stream.Read(buffer, packetHeaderLength + totalBytesRead, sizeToRead);
else
bytesRead = await stream.ReadAsync(buffer, packetHeaderLength + totalBytesRead, sizeToRead, cancellationToken).ConfigureAwait(false);
totalBytesRead += bytesRead;
} while (bytesRead > 0 && totalBytesRead < maxDataSize);
#endif

if (totalBytesRead == 0)
break;

// send StatementSendLongData; MySQL Server will keep appending the sent data to the parameter value
using var payload = new PayloadData(buffer.AsMemory(0, packetHeaderLength + totalBytesRead), isPooled: false);
await connection.Session.SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
}
}
}
}
finally
{
if (buffer is not null)
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
}

public bool WriteQueryCommand(ref CommandListPosition commandListPosition, IDictionary<string, CachedProcedure?> cachedProcedures, ByteBufferWriter writer, bool appendSemicolon)
{
if (commandListPosition.CommandIndex == commandListPosition.CommandCount)
Expand Down Expand Up @@ -58,6 +136,7 @@ public bool WriteQueryCommand(ref CommandListPosition commandListPosition, IDict
{
commandListPosition.CommandIndex++;
commandListPosition.PreparedStatementIndex = 0;
commandListPosition.PreparedStatements = null;
}
}
return true;
Expand Down
4 changes: 3 additions & 1 deletion src/MySqlConnector/MySqlDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ internal async Task<bool> NextResultAsync(IOBehavior ioBehavior, CancellationTok
Command = m_commandListPosition.CommandAt(m_commandListPosition.CommandIndex);
using (Command.CancellableCommand.RegisterCancel(cancellationToken))
{
await m_payloadCreator!.SendCommandPrologueAsync(Command.Connection!, m_commandListPosition, ioBehavior, cancellationToken).ConfigureAwait(false);

var writer = new ByteBufferWriter();
if (!Command.Connection!.Session.IsCancelingQuery && m_payloadCreator!.WriteQueryCommand(ref m_commandListPosition, m_cachedProcedures!, writer, false))
if (!Command.Connection!.Session.IsCancelingQuery && m_payloadCreator.WriteQueryCommand(ref m_commandListPosition, m_cachedProcedures!, writer, false))
{
using var payload = writer.ToPayloadData();
await Command.Connection.Session.SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
Expand Down
8 changes: 8 additions & 0 deletions src/MySqlConnector/MySqlParameter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ internal void AppendSqlString(ByteBufferWriter writer, StatementPreparerOptions
Debug.Assert(index == length, "index == length");
writer.Advance(index);
}
else if (Value is Stream)
{
throw new NotSupportedException($"Parameter type {Value.GetType().Name} can only be used after calling MySqlCommand.Prepare.");
}
else if (Value is bool boolValue)
{
writer.Write(boolValue ? "true"u8 : "false"u8);
Expand Down Expand Up @@ -728,6 +732,10 @@ private void AppendBinary(ByteBufferWriter writer, object value, StatementPrepar
writer.WriteLengthEncodedInteger(unchecked((ulong) streamBuffer.Count));
writer.Write(streamBuffer);
}
else if (value is Stream)
{
// do nothing; this will be sent via CommandKind.StatementSendLongData
}
else if (value is float floatValue)
{
#if NET5_0_OR_GREATER
Expand Down
1 change: 1 addition & 0 deletions src/MySqlConnector/Protocol/CommandKind.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ internal enum CommandKind
ChangeUser = 17,
StatementPrepare = 22,
StatementExecute = 23,
StatementSendLongData = 24,
ResetConnection = 31,
}
130 changes: 130 additions & 0 deletions tests/IntegrationTests/ChunkStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
namespace IntegrationTests;

internal sealed class ChunkStream : Stream
{
public ChunkStream(byte[] data, int chunkLength)
{
if (data is null)
throw new ArgumentNullException(nameof(data));
if (chunkLength <= 0)
throw new ArgumentOutOfRangeException(nameof(chunkLength));

m_data = data;
m_chunkLength = chunkLength;
m_position = 0;
}

public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => m_data.Length;
public override long Position
{
get => m_position;
set => throw new NotSupportedException();
}

public override int Read(byte[] buffer, int offset, int count)
{
if (buffer is null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0 || offset > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(count));

return Read(buffer.AsSpan(offset, count));
}

public
#if NETSTANDARD2_1 || NETCOREAPP2_1_OR_GREATER
override
#endif
int Read(Span<byte> buffer)
{
if (m_position >= m_data.Length)
return 0;

// Read at most chunkLength bytes
var bytesToRead = Math.Min(buffer.Length, Math.Min(m_chunkLength, m_data.Length - m_position));

// Copy data from the actual data array
m_data.AsSpan(m_position, bytesToRead).CopyTo(buffer);

m_position += bytesToRead;
return bytesToRead;
}

public override int ReadByte()
{
Span<byte> buffer = stackalloc byte[1];
var bytesRead = Read(buffer);
return bytesRead == 0 ? -1 : buffer[0];
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer is null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0 || offset > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(count));

if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled<int>(cancellationToken);

try
{
return Task.FromResult(Read(buffer.AsSpan(offset, count)));
}
catch (Exception ex)
{
return Task.FromException<int>(ex);
}
}

public
#if NETSTANDARD2_1 || NETCOREAPP2_1_OR_GREATER
override
#endif
ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
return new(Task.FromCanceled<int>(cancellationToken));

try
{
return new(Read(buffer.Span));
}
catch (Exception ex)
{
return new(Task.FromException<int>(ex));
}
}

public override void Write(byte[] buffer, int offset, int count) =>
throw new NotSupportedException();

public override void WriteByte(byte value) =>
throw new NotSupportedException();

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
throw new NotSupportedException();

public override void SetLength(long value) =>
throw new NotSupportedException();

public override long Seek(long offset, SeekOrigin origin) =>
throw new NotSupportedException();

public override void Flush() =>
throw new NotSupportedException();

public override Task FlushAsync(CancellationToken cancellationToken) =>
throw new NotSupportedException();

private readonly byte[] m_data;
private readonly int m_chunkLength;
private int m_position;
}
Loading