Skip to content

Commit 8c49746

Browse files
committed
Allow rows larger than 1 MiB in MySqlBulkCopy. Fixes #834
1 parent 6b8ec46 commit 8c49746

File tree

4 files changed

+119
-192
lines changed

4 files changed

+119
-192
lines changed

src/MySqlConnector/MySqlBulkCopy.cs

Lines changed: 76 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ internal async Task SendDataReaderAsync(IOBehavior ioBehavior, CancellationToken
364364
try
365365
{
366366
var values = new object?[m_valuesEnumerator!.FieldCount];
367+
Encoder? utf8Encoder = null;
367368
while (true)
368369
{
369370
var hasMore = ioBehavior == IOBehavior.Asynchronous ?
@@ -373,46 +374,31 @@ await m_valuesEnumerator.MoveNextAsync().ConfigureAwait(false) :
373374
break;
374375

375376
m_valuesEnumerator.GetValues(values);
376-
retryRow:
377-
var startOutputIndex = outputIndex;
378-
var wroteRow = true;
379-
var shouldAppendSeparator = false;
380-
foreach (var value in values)
377+
for (var valueIndex = 0; valueIndex < values.Length; valueIndex++)
381378
{
382-
if (shouldAppendSeparator)
379+
if (valueIndex > 0)
383380
buffer[outputIndex++] = (byte) '\t';
384-
else
385-
shouldAppendSeparator = true;
386381

387-
if (outputIndex >= maxLength || !WriteValue(m_connection, value, buffer.AsSpan(0, maxLength).Slice(outputIndex), out var bytesWritten))
382+
var inputIndex = 0;
383+
var bytesWritten = 0;
384+
while (outputIndex >= maxLength || !WriteValue(m_connection, values[valueIndex], ref inputIndex, ref utf8Encoder, buffer.AsSpan(0, maxLength).Slice(outputIndex), out bytesWritten))
388385
{
389-
wroteRow = false;
390-
break;
386+
var payload = new PayloadData(new ArraySegment<byte>(buffer, 0, outputIndex + bytesWritten));
387+
await m_connection.Session.SendReplyAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
388+
outputIndex = 0;
389+
bytesWritten = 0;
391390
}
392391
outputIndex += bytesWritten;
393392
}
393+
buffer[outputIndex++] = (byte) '\n';
394394

395-
if (!wroteRow)
396-
{
397-
if (startOutputIndex == 0)
398-
throw new NotSupportedException("Total row length must be less than 1 MiB.");
399-
var payload = new PayloadData(new ArraySegment<byte>(buffer, 0, startOutputIndex));
400-
await m_connection.Session.SendReplyAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
401-
outputIndex = 0;
402-
goto retryRow;
403-
}
404-
else
395+
RowsCopied++;
396+
if (eventArgs is not null && RowsCopied % NotifyAfter == 0)
405397
{
406-
buffer[outputIndex++] = (byte) '\n';
407-
408-
RowsCopied++;
409-
if (eventArgs is not null && RowsCopied % NotifyAfter == 0)
410-
{
411-
eventArgs.RowsCopied = RowsCopied;
412-
MySqlRowsCopied!(this, eventArgs);
413-
if (eventArgs.Abort)
414-
break;
415-
}
398+
eventArgs.RowsCopied = RowsCopied;
399+
MySqlRowsCopied!(this, eventArgs);
400+
if (eventArgs.Abort)
401+
break;
416402
}
417403
}
418404

@@ -428,8 +414,14 @@ await m_valuesEnumerator.MoveNextAsync().ConfigureAwait(false) :
428414
m_wasAborted = eventArgs?.Abort ?? false;
429415
}
430416

431-
static bool WriteValue(MySqlConnection connection, object? value, Span<byte> output, out int bytesWritten)
417+
static bool WriteValue(MySqlConnection connection, object? value, ref int inputIndex, ref Encoder? utf8Encoder, Span<byte> output, out int bytesWritten)
432418
{
419+
if (output.Length == 0)
420+
{
421+
bytesWritten = 0;
422+
return false;
423+
}
424+
433425
if (value is null || value == DBNull.Value)
434426
{
435427
if (output.Length < EscapedNull.Length)
@@ -443,11 +435,11 @@ static bool WriteValue(MySqlConnection connection, object? value, Span<byte> out
443435
}
444436
else if (value is string stringValue)
445437
{
446-
return WriteString(stringValue, output, out bytesWritten);
438+
return WriteSubstring(stringValue, ref inputIndex, ref utf8Encoder, output, out bytesWritten);
447439
}
448440
else if (value is char charValue)
449441
{
450-
return WriteString(charValue.ToString(), output, out bytesWritten);
442+
return WriteString(charValue.ToString(), ref utf8Encoder, output, out bytesWritten);
451443
}
452444
else if (value is byte byteValue)
453445
{
@@ -493,7 +485,7 @@ static bool WriteValue(MySqlConnection connection, object? value, Span<byte> out
493485
value is MySqlGeometry geometry ? geometry.ValueSpan :
494486
((ReadOnlyMemory<byte>) value).Span;
495487

496-
return WriteBytes(inputSpan, output, out bytesWritten);
488+
return WriteBytes(inputSpan, ref inputIndex, output, out bytesWritten);
497489
}
498490
else if (value is bool boolValue)
499491
{
@@ -509,14 +501,14 @@ static bool WriteValue(MySqlConnection connection, object? value, Span<byte> out
509501
else if (value is float || value is double)
510502
{
511503
// NOTE: Utf8Formatter doesn't support "R"
512-
return WriteString("{0:R}".FormatInvariant(value), output, out bytesWritten);
504+
return WriteString("{0:R}".FormatInvariant(value), ref utf8Encoder, output, out bytesWritten);
513505
}
514506
else if (value is MySqlDateTime mySqlDateTimeValue)
515507
{
516508
if (mySqlDateTimeValue.IsValidDateTime)
517-
return WriteString("{0:yyyy'-'MM'-'dd' 'HH':'mm':'ss'.'ffffff}".FormatInvariant(mySqlDateTimeValue.GetDateTime()), output, out bytesWritten);
509+
return WriteString("{0:yyyy'-'MM'-'dd' 'HH':'mm':'ss'.'ffffff}".FormatInvariant(mySqlDateTimeValue.GetDateTime()), ref utf8Encoder, output, out bytesWritten);
518510
else
519-
return WriteString("0000-00-00", output, out bytesWritten);
511+
return WriteString("0000-00-00", ref utf8Encoder, output, out bytesWritten);
520512
}
521513
else if (value is DateTime dateTimeValue)
522514
{
@@ -525,12 +517,12 @@ static bool WriteValue(MySqlConnection connection, object? value, Span<byte> out
525517
else if (connection.DateTimeKind == DateTimeKind.Local && dateTimeValue.Kind == DateTimeKind.Utc)
526518
throw new MySqlException("DateTime.Kind must not be Utc when DateTimeKind setting is Local");
527519

528-
return WriteString("{0:yyyy'-'MM'-'dd' 'HH':'mm':'ss'.'ffffff}".FormatInvariant(dateTimeValue), output, out bytesWritten);
520+
return WriteString("{0:yyyy'-'MM'-'dd' 'HH':'mm':'ss'.'ffffff}".FormatInvariant(dateTimeValue), ref utf8Encoder, output, out bytesWritten);
529521
}
530522
else if (value is DateTimeOffset dateTimeOffsetValue)
531523
{
532524
// store as UTC as it will be read as such when deserialized from a timespan column
533-
return WriteString("{0:yyyy'-'MM'-'dd' 'HH':'mm':'ss'.'ffffff}".FormatInvariant(dateTimeOffsetValue.UtcDateTime), output, out bytesWritten);
525+
return WriteString("{0:yyyy'-'MM'-'dd' 'HH':'mm':'ss'.'ffffff}".FormatInvariant(dateTimeOffsetValue.UtcDateTime), ref utf8Encoder, output, out bytesWritten);
534526
}
535527
else if (value is TimeSpan ts)
536528
{
@@ -540,7 +532,7 @@ static bool WriteValue(MySqlConnection connection, object? value, Span<byte> out
540532
isNegative = true;
541533
ts = TimeSpan.FromTicks(-ts.Ticks);
542534
}
543-
return WriteString("{0}{1}:{2:mm':'ss'.'ffffff}'".FormatInvariant(isNegative ? "-" : "", ts.Days * 24 + ts.Hours, ts), output, out bytesWritten);
535+
return WriteString("{0}{1}:{2:mm':'ss'.'ffffff}'".FormatInvariant(isNegative ? "-" : "", ts.Days * 24 + ts.Hours, ts), ref utf8Encoder, output, out bytesWritten);
544536
}
545537
else if (value is Guid guidValue)
546538
{
@@ -566,7 +558,7 @@ static bool WriteValue(MySqlConnection connection, object? value, Span<byte> out
566558
Utility.SwapBytes(bytes, 1, 3);
567559
}
568560
}
569-
return WriteBytes(bytes, output, out bytesWritten);
561+
return WriteBytes(bytes, ref inputIndex, output, out bytesWritten);
570562
}
571563
else
572564
{
@@ -576,72 +568,80 @@ static bool WriteValue(MySqlConnection connection, object? value, Span<byte> out
576568
}
577569
else if (value is Enum)
578570
{
579-
return WriteString("{0:d}".FormatInvariant(value), output, out bytesWritten);
571+
return WriteString("{0:d}".FormatInvariant(value), ref utf8Encoder, output, out bytesWritten);
580572
}
581573
else
582574
{
583575
throw new NotSupportedException("Type {0} not currently supported. Value: {1}".FormatInvariant(value.GetType().Name, value));
584576
}
585577
}
586578

587-
static bool WriteString(string value, Span<byte> output, out int bytesWritten)
579+
static bool WriteString(string value, ref Encoder? utf8Encoder, Span<byte> output, out int bytesWritten)
580+
{
581+
var inputIndex = 0;
582+
if (WriteSubstring(value, ref inputIndex, ref utf8Encoder, output, out bytesWritten))
583+
return true;
584+
bytesWritten = 0;
585+
return false;
586+
}
587+
588+
// Writes as much of 'value' as possible, starting at 'inputIndex' and writing UTF-8-encoded bytes to 'output'.
589+
// 'inputIndex' will be updated to the next character to be written, and 'bytesWritten' the number of bytes written to 'output'.
590+
static bool WriteSubstring(string value, ref int inputIndex, ref Encoder? utf8Encoder, Span<byte> output, out int bytesWritten)
588591
{
589-
var index = 0;
590592
bytesWritten = 0;
591-
while (index < value.Length)
593+
while (inputIndex < value.Length)
592594
{
593-
if (Array.IndexOf(s_specialCharacters, value[index]) != -1)
595+
if (Array.IndexOf(s_specialCharacters, value[inputIndex]) != -1)
594596
{
595-
if (output.Length < 2)
596-
{
597-
bytesWritten = 0;
597+
if (output.Length <= 2)
598598
return false;
599-
}
600-
599+
601600
output[0] = (byte) '\\';
602-
output[1] = (byte) value[index];
601+
output[1] = (byte) value[inputIndex];
603602
output = output.Slice(2);
604603
bytesWritten += 2;
605-
index++;
604+
inputIndex++;
606605
}
607606
else
608607
{
609-
var nextIndex = value.IndexOfAny(s_specialCharacters, index);
608+
var nextIndex = value.IndexOfAny(s_specialCharacters, inputIndex);
610609
if (nextIndex == -1)
611610
nextIndex = value.Length;
612-
var encodedSize = Encoding.UTF8.GetByteCount(value.AsSpan(index, nextIndex - index));
613-
if (encodedSize > output.Length)
614-
{
615-
bytesWritten = 0;
611+
612+
utf8Encoder ??= Encoding.UTF8.GetEncoder();
613+
#if NETSTANDARD1_3
614+
var buffer = new byte[output.Length];
615+
utf8Encoder.Convert(value.ToCharArray(), inputIndex, nextIndex - inputIndex, buffer, 0, buffer.Length, nextIndex == value.Length, out var charsUsed, out var bytesUsed, out var completed);
616+
buffer.AsSpan().CopyTo(output);
617+
#else
618+
utf8Encoder.Convert(value.AsSpan(inputIndex, nextIndex - inputIndex), output, nextIndex == value.Length, out var charsUsed, out var bytesUsed, out var completed);
619+
#endif
620+
621+
bytesWritten += bytesUsed;
622+
output = output.Slice(bytesUsed);
623+
inputIndex += charsUsed;
624+
625+
if (!completed)
616626
return false;
617-
}
618-
var encodedBytesWritten = Encoding.UTF8.GetBytes(value.AsSpan(index, nextIndex - index), output);
619-
bytesWritten += encodedBytesWritten;
620-
output = output.Slice(encodedBytesWritten);
621-
index = nextIndex;
622627
}
623628
}
624629

625630
return true;
626631
}
627632

628-
static bool WriteBytes(ReadOnlySpan<byte> value, Span<byte> output, out int bytesWritten)
633+
static bool WriteBytes(ReadOnlySpan<byte> value, ref int inputIndex, Span<byte> output, out int bytesWritten)
629634
{
630-
if (output.Length < value.Length * 2)
631-
{
632-
bytesWritten = 0;
633-
return false;
634-
}
635-
636-
foreach (var by in value)
635+
bytesWritten = 0;
636+
for (; inputIndex < value.Length && output.Length > 2; inputIndex++)
637637
{
638-
WriteNibble(by >> 4, output);
639-
WriteNibble(by & 0xF, output.Slice(1));
638+
WriteNibble(value[inputIndex] >> 4, output);
639+
WriteNibble(value[inputIndex] & 0xF, output.Slice(1));
640640
output = output.Slice(2);
641+
bytesWritten += 2;
641642
}
642643

643-
bytesWritten = value.Length * 2;
644-
return true;
644+
return inputIndex == value.Length;
645645
}
646646

647647
static void WriteNibble(int value, Span<byte> output) => output[0] = value < 10 ? (byte) (value + 0x30) : (byte) (value + 0x57);

src/MySqlConnector/Utilities/Utility.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public static unsafe int GetBytes(this Encoding encoding, ReadOnlySpan<char> cha
6565
}
6666
#endif
6767

68-
#if NET461 || NET471 || NETSTANDARD2_0
68+
#if NET45 || NET461 || NET471 || NETSTANDARD2_0
6969
public static unsafe void Convert(this Encoder encoder, ReadOnlySpan<char> chars, Span<byte> bytes, bool flush, out int charsUsed, out int bytesUsed, out bool completed)
7070
{
7171
fixed (char* charsPtr = &MemoryMarshal.GetReference(chars))

tests/SideBySide/BulkLoaderAsync.cs

Lines changed: 1 addition & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -477,47 +477,6 @@ public async Task BulkCopyDataTableWithLongData()
477477
await bulkCopy.WriteToServerAsync(dataTable);
478478
}
479479

480-
[Fact]
481-
public async Task BulkCopyDataTableWithTooLongData()
482-
{
483-
var dataTable = new DataTable()
484-
{
485-
Columns =
486-
{
487-
new DataColumn("data", typeof(byte[])),
488-
},
489-
Rows =
490-
{
491-
new object[] { new byte[524300] },
492-
}
493-
};
494-
495-
using var connection = new MySqlConnection(GetLocalConnectionString());
496-
await connection.OpenAsync();
497-
using (var cmd = new MySqlCommand(@"drop table if exists bulk_load_data_table;
498-
create table bulk_load_data_table(a int, b longblob);", connection))
499-
{
500-
await cmd.ExecuteNonQueryAsync();
501-
}
502-
503-
var bulkCopy = new MySqlBulkCopy(connection)
504-
{
505-
DestinationTableName = "bulk_load_data_table",
506-
ColumnMappings =
507-
{
508-
new(0, "b"),
509-
}
510-
};
511-
try
512-
{
513-
await bulkCopy.WriteToServerAsync(dataTable);
514-
Assert.True(false, "Expected exception wasn't thrown");
515-
}
516-
catch (MySqlException ex) when (ex.InnerException?.InnerException is NotSupportedException)
517-
{
518-
}
519-
}
520-
521480
[Theory]
522481
[InlineData(0, 15, 0, 0)]
523482
[InlineData(5, 15, 3, 15)]
@@ -563,7 +522,7 @@ public async Task BulkCopyNotifyAfter(int notifyAfter, int rowCount, int expecte
563522
[Theory]
564523
[InlineData(0, 40, 0, 0, 0, 40)]
565524
[InlineData(5, 40, 15, 3, 15, 0)]
566-
[InlineData(5, 40, 20, 4, 20, 16)]
525+
[InlineData(5, 40, 20, 4, 20, 17)]
567526
[InlineData(int.MaxValue, 20, 0, 0, 0, 20)]
568527
public async Task BulkCopyAbort(int notifyAfter, int rowCount, int abortAfter, int expectedEventCount, int expectedRowsCopied, long expectedCount)
569528
{

0 commit comments

Comments
 (0)