|
| 1 | +using System.Buffers; |
1 | 2 | using System.Diagnostics.CodeAnalysis;
|
2 | 3 | using Microsoft.Extensions.Logging;
|
3 | 4 | using MySqlConnector.Core;
|
| 5 | +using MySqlConnector.Protocol; |
4 | 6 | using MySqlConnector.Protocol.Serialization;
|
5 | 7 | using MySqlConnector.Utilities;
|
6 | 8 |
|
@@ -357,15 +359,52 @@ internal async ValueTask<MySqlDataReader> ExecuteReaderAsync(CommandBehavior beh
|
357 | 359 | return await ExecuteReaderNoResetTimeoutAsync(behavior, ioBehavior, cancellationToken).ConfigureAwait(false);
|
358 | 360 | }
|
359 | 361 |
|
360 |
| - internal ValueTask<MySqlDataReader> ExecuteReaderNoResetTimeoutAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) |
| 362 | + internal async ValueTask<MySqlDataReader> ExecuteReaderNoResetTimeoutAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) |
361 | 363 | {
|
362 | 364 | if (!IsValid(out var exception))
|
363 |
| - return ValueTaskExtensions.FromException<MySqlDataReader>(exception); |
| 365 | + return await ValueTaskExtensions.FromException<MySqlDataReader>(exception).ConfigureAwait(false); |
| 366 | + |
| 367 | + if (((IMySqlCommand) this).TryGetPreparedStatements() is { Statements.Count: 1 } statements) |
| 368 | + { |
| 369 | + for (var i = 0; i < Parameters.Count; i++) |
| 370 | + { |
| 371 | + if (Parameters[i].Value is Stream stream) |
| 372 | + { |
| 373 | + var writer = new ByteBufferWriter(); |
| 374 | + writer.Write((byte) CommandKind.StatementSendLongData); |
| 375 | + writer.Write(statements.Statements[0].StatementId); |
| 376 | + writer.Write((ushort) i); |
| 377 | + |
| 378 | + var buffer = ArrayPool<byte>.Shared.Rent(ProtocolUtility.MaxPacketSize); |
| 379 | + var dataLength = ProtocolUtility.MaxPacketSize - 7; |
| 380 | + |
| 381 | + var bytesRead = stream.Read(buffer, 0, dataLength); |
| 382 | + writer.Write(buffer.AsSpan(0, bytesRead)); |
| 383 | + |
| 384 | + await Connection!.Session.SendAsync(writer.ToPayloadData(), ioBehavior, cancellationToken).ConfigureAwait(false); |
| 385 | + |
| 386 | + if (bytesRead == dataLength) |
| 387 | + { |
| 388 | + dataLength = ProtocolUtility.MaxPacketSize; |
| 389 | + |
| 390 | + do |
| 391 | + { |
| 392 | + bytesRead = stream.Read(buffer, 0, dataLength); |
| 393 | + |
| 394 | + writer = new ByteBufferWriter(); |
| 395 | + writer.Write(buffer.AsSpan(0, bytesRead)); |
| 396 | + await Connection!.Session.SendReplyAsync(writer.ToPayloadData(), ioBehavior, cancellationToken).ConfigureAwait(false); |
| 397 | + } |
| 398 | + while (bytesRead == dataLength); |
| 399 | + } |
| 400 | + } |
| 401 | + } |
| 402 | + } |
364 | 403 |
|
365 | 404 | var activity = NoActivity ? null : Connection!.Session.StartActivity(ActivitySourceHelper.ExecuteActivityName,
|
366 | 405 | ActivitySourceHelper.DatabaseStatementTagName, CommandText);
|
367 | 406 | m_commandBehavior = behavior;
|
368 |
| - return CommandExecutor.ExecuteReaderAsync(new(this), SingleCommandPayloadCreator.Instance, behavior, activity, ioBehavior, cancellationToken); |
| 407 | + return await CommandExecutor.ExecuteReaderAsync(new(this), SingleCommandPayloadCreator.Instance, behavior, activity, ioBehavior, cancellationToken).ConfigureAwait(false); |
369 | 408 | }
|
370 | 409 |
|
371 | 410 | public MySqlCommand Clone() => new(this);
|
|
0 commit comments