Skip to content

Implement COM_STMT_SEND_LONG_DATA #1579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions src/MySqlConnector/MySqlCommand.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.InteropServices;
using Microsoft.Extensions.Logging;
using MySqlConnector.Core;
using MySqlConnector.Protocol;
using MySqlConnector.Protocol.Serialization;
using MySqlConnector.Utilities;

Expand Down Expand Up @@ -357,15 +359,68 @@ internal async ValueTask<MySqlDataReader> ExecuteReaderAsync(CommandBehavior beh
return await ExecuteReaderNoResetTimeoutAsync(behavior, ioBehavior, cancellationToken).ConfigureAwait(false);
}

internal ValueTask<MySqlDataReader> ExecuteReaderNoResetTimeoutAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
internal async ValueTask<MySqlDataReader> ExecuteReaderNoResetTimeoutAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
if (!IsValid(out var exception))
return ValueTaskExtensions.FromException<MySqlDataReader>(exception);
return await ValueTaskExtensions.FromException<MySqlDataReader>(exception).ConfigureAwait(false);

if (((IMySqlCommand) this).TryGetPreparedStatements() is { Statements.Count: 1 } statements)
{
for (var i = 0; i < Parameters.Count; i++)
{
if (Parameters[i].Value is Stream stream and not MemoryStream)
{
// send almost-full packets, but don't send exactly ProtocolUtility.MaxPacketSize bytes in one payload (as that's ambiguous to whether another packet follows).
const int maxDataSize = 16_000_000;
int totalBytesRead;
while (true)
{
// write seven-byte COM_STMT_SEND_LONG_DATA header: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_send_long_data.html
var writer = new ByteBufferWriter(maxDataSize);
writer.Write((byte) CommandKind.StatementSendLongData);
writer.Write(statements.Statements[0].StatementId);
writer.Write((ushort) i);

// keep reading from the stream until we've filled the buffer to send
#if NET7_0_OR_GREATER
if (ioBehavior == IOBehavior.Synchronous)
totalBytesRead = stream.ReadAtLeast(writer.GetSpan(maxDataSize).Slice(0, maxDataSize), maxDataSize, throwOnEndOfStream: false);
else
totalBytesRead = await stream.ReadAtLeastAsync(writer.GetMemory(maxDataSize).Slice(0, maxDataSize), maxDataSize, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false);
writer.Advance(totalBytesRead);
#else
totalBytesRead = 0;
int bytesRead;
do
{
var sizeToRead = maxDataSize - totalBytesRead;
ReadOnlyMemory<byte> bufferMemory = writer.GetMemory(sizeToRead);
if (!MemoryMarshal.TryGetArray(bufferMemory, out var arraySegment))
throw new InvalidOperationException("Failed to get array segment from buffer memory.");
if (ioBehavior == IOBehavior.Synchronous)
bytesRead = stream.Read(arraySegment.Array!, arraySegment.Offset, sizeToRead);
else
bytesRead = await stream.ReadAsync(arraySegment.Array!, arraySegment.Offset, sizeToRead, cancellationToken).ConfigureAwait(false);
totalBytesRead += bytesRead;
writer.Advance(bytesRead);
} while (bytesRead > 0);
#endif

if (totalBytesRead == 0)
break;

// send StatementSendLongData; MySQL Server will keep appending the sent data to the parameter value
using var payload = writer.ToPayloadData();
await Connection!.Session.SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
}
}
}
}

var activity = NoActivity ? null : Connection!.Session.StartActivity(ActivitySourceHelper.ExecuteActivityName,
ActivitySourceHelper.DatabaseStatementTagName, CommandText);
m_commandBehavior = behavior;
return CommandExecutor.ExecuteReaderAsync(new(this), SingleCommandPayloadCreator.Instance, behavior, activity, ioBehavior, cancellationToken);
return await CommandExecutor.ExecuteReaderAsync(new(this), SingleCommandPayloadCreator.Instance, behavior, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
}

public MySqlCommand Clone() => new(this);
Expand Down
8 changes: 8 additions & 0 deletions src/MySqlConnector/MySqlParameter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ internal void AppendSqlString(ByteBufferWriter writer, StatementPreparerOptions
Debug.Assert(index == length, "index == length");
writer.Advance(index);
}
else if (Value is Stream)
{
throw new NotSupportedException($"Parameter type {Value.GetType().Name} can only be used after calling MySqlCommand.Prepare.");
}
else if (Value is bool boolValue)
{
writer.Write(boolValue ? "true"u8 : "false"u8);
Expand Down Expand Up @@ -728,6 +732,10 @@ private void AppendBinary(ByteBufferWriter writer, object value, StatementPrepar
writer.WriteLengthEncodedInteger(unchecked((ulong) streamBuffer.Count));
writer.Write(streamBuffer);
}
else if (value is Stream)
{
// do nothing; this will be sent via CommandKind.StatementSendLongData
}
else if (value is float floatValue)
{
#if NET5_0_OR_GREATER
Expand Down
1 change: 1 addition & 0 deletions src/MySqlConnector/Protocol/CommandKind.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ internal enum CommandKind
ChangeUser = 17,
StatementPrepare = 22,
StatementExecute = 23,
StatementSendLongData = 24,
ResetConnection = 31,
}
128 changes: 128 additions & 0 deletions tests/IntegrationTests/ChunkStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
namespace IntegrationTests;

internal sealed class ChunkStream : Stream
{
public ChunkStream(int dataLength, int chunkLength)
{
m_dataLength = dataLength;
m_chunkLength = chunkLength;
m_position = 0;
}

public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => m_dataLength;
public override long Position
{
get => m_position;
set => throw new NotSupportedException();
}

public override int Read(byte[] buffer, int offset, int count)
{
if (buffer is null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0 || offset > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(count));

return Read(buffer.AsSpan(offset, count));
}

public
#if NETSTANDARD2_1 || NETCOREAPP2_1_OR_GREATER
override
#endif
int Read(Span<byte> buffer)
{
if (m_position >= m_dataLength)
return 0;

// Read at most chunkLength bytes
var bytesToRead = Math.Min(buffer.Length, Math.Min(m_chunkLength, m_dataLength - m_position));

// Fill with dummy data (repeating pattern based on position)
for (var i = 0; i < bytesToRead; i++)
{
buffer[i] = (byte) ((m_position + i) % 256);
}

m_position += bytesToRead;
return bytesToRead;
}

public override int ReadByte()
{
Span<byte> buffer = stackalloc byte[1];
var bytesRead = Read(buffer);
return bytesRead == 0 ? -1 : buffer[0];
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer is null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0 || offset > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(count));

if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled<int>(cancellationToken);

try
{
return Task.FromResult(Read(buffer.AsSpan(offset, count)));
}
catch (Exception ex)
{
return Task.FromException<int>(ex);
}
}

public
#if NETSTANDARD2_1 || NETCOREAPP2_1_OR_GREATER
override
#endif
ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
return new(Task.FromCanceled<int>(cancellationToken));

try
{
return new(Read(buffer.Span));
}
catch (Exception ex)
{
return new(Task.FromException<int>(ex));
}
}

public override void Write(byte[] buffer, int offset, int count) =>
throw new NotSupportedException();

public override void WriteByte(byte value) =>
throw new NotSupportedException();

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
throw new NotSupportedException();

public override void SetLength(long value) =>
throw new NotSupportedException();

public override long Seek(long offset, SeekOrigin origin) =>
throw new NotSupportedException();

public override void Flush() =>
throw new NotSupportedException();

public override Task FlushAsync(CancellationToken cancellationToken) =>
throw new NotSupportedException();

private readonly int m_dataLength;
private readonly int m_chunkLength;
private int m_position;
}
52 changes: 52 additions & 0 deletions tests/IntegrationTests/InsertTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,58 @@ public void InsertMySqlDecimalAsDecimal(bool prepare)
}
#endif

[Theory]
[InlineData(1_000_000, 1024, true)]
[InlineData(1_000_000, 1024, false)]
[InlineData(1_000_000, int.MaxValue, true)]
[InlineData(1_000_000, int.MaxValue, false)]
[InlineData(0xff_fff8, 299593, true)]
[InlineData(0xff_fff8, 299593, false)]
[InlineData(0xff_fff8, 300000, true)]
[InlineData(0xff_fff8, 300000, false)]
[InlineData(0xff_fff8, int.MaxValue, true)]
[InlineData(0xff_fff8, int.MaxValue, false)]
[InlineData(0xff_fff9, int.MaxValue, true)]
[InlineData(0xff_fff9, int.MaxValue, false)]
[InlineData(0x1ff_fff0, 299593, true)]
[InlineData(0x1ff_fff0, 299593, false)]
[InlineData(0x1ff_fff0, 300000, true)]
[InlineData(0x1ff_fff0, 300000, false)]
[InlineData(15_999_999, int.MaxValue, true)]
[InlineData(15_999_999, int.MaxValue, false)]
[InlineData(16_000_000, int.MaxValue, true)]
[InlineData(16_000_000, int.MaxValue, false)]
[InlineData(16_000_001, int.MaxValue, true)]
[InlineData(16_000_001, int.MaxValue, false)]
[InlineData(31_999_999, 999_999, true)]
[InlineData(31_999_999, 1_000_000, false)]
[InlineData(32_000_000, 1_000_001, true)]
[InlineData(32_000_000, 1_000_002, false)]
[InlineData(32_000_001, 1_000_003, true)]
[InlineData(32_000_001, 1_000_004, false)]
public async Task SendLongData(int dataLength, int chunkLength, bool isAsync)
{
using MySqlConnection connection = new MySqlConnection(AppConfig.ConnectionString);
connection.Open();
connection.Execute("""
drop table if exists insert_mysql_long_data;
create table insert_mysql_long_data(rowid integer not null primary key auto_increment, value longblob);
""");

using var chunkStream = new ChunkStream(dataLength, chunkLength);

using var writeCommand = new MySqlCommand("insert into insert_mysql_long_data(value) values(@value);", connection);
writeCommand.Parameters.AddWithValue("@value", chunkStream);
writeCommand.Prepare();
if (isAsync)
await writeCommand.ExecuteNonQueryAsync().ConfigureAwait(true);
else
writeCommand.ExecuteNonQuery();

using var readLengthCommand = new MySqlCommand("select length(value) from insert_mysql_long_data order by rowid;", connection);
Assert.Equal(chunkStream.Length, readLengthCommand.ExecuteScalar());
}

[Theory]
[InlineData(false)]
[InlineData(true)]
Expand Down
Loading