Skip to content

Commit b7bf205

Browse files
committed
Implement MySqlBulkCopy.RowsCopied event. Fixes #769
1 parent 4ba7b66 commit b7bf205

File tree

4 files changed

+237
-1
lines changed

4 files changed

+237
-1
lines changed

src/MySqlConnector/MySql.Data.MySqlClient/MySqlBulkCopy.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,20 @@ public MySqlBulkCopy(MySqlConnection connection, MySqlTransaction? transaction =
2626

2727
public string? DestinationTableName { get; set; }
2828

29+
/// <summary>
30+
/// Defines the number of rows to be processed before generating a notification event.
31+
/// </summary>
32+
public int NotifyAfter { get; set; }
33+
34+
/// <summary>
35+
/// Occurs every time that the number of rows specified by the <see cref="NotifyAfter"/> property have been processed,
36+
/// and once after all rows have been copied (if <see cref="NotifyAfter"/> is non-zero).
37+
/// </summary>
38+
/// <remarks>
39+
/// Receipt of a RowsCopied event does not imply that any rows have been sent to the server or committed.
40+
/// </remarks>
41+
public event MySqlRowsCopiedEventHandler? RowsCopied;
42+
2943
#if !NETSTANDARD1_3
3044
public void WriteToServer(DataTable dataTable)
3145
{
@@ -167,6 +181,12 @@ internal async Task SendDataReaderAsync(IOBehavior ioBehavior, CancellationToken
167181
var buffer = ArrayPool<byte>.Shared.Rent(maxLength + 1);
168182
var outputIndex = 0;
169183

184+
// allocate a reusable MySqlRowsCopiedEventArgs if event notification is necessary
185+
var rowsCopied = 0;
186+
MySqlRowsCopiedEventArgs? eventArgs = null;
187+
if (NotifyAfter > 0 && RowsCopied is object)
188+
eventArgs = new MySqlRowsCopiedEventArgs();
189+
170190
try
171191
{
172192
var values = new object?[m_valuesEnumerator!.FieldCount];
@@ -210,14 +230,30 @@ await m_valuesEnumerator.MoveNextAsync().ConfigureAwait(false) :
210230
else
211231
{
212232
buffer[outputIndex++] = (byte) '\n';
233+
234+
rowsCopied++;
235+
if (eventArgs is object && rowsCopied % NotifyAfter == 0)
236+
{
237+
eventArgs.RowsCopied = rowsCopied;
238+
RowsCopied!(this, eventArgs);
239+
if (eventArgs.Abort)
240+
break;
241+
}
213242
}
214243
}
215244

216-
if (outputIndex != 0)
245+
if (outputIndex != 0 && !(eventArgs?.Abort ?? false))
217246
{
218247
var payload2 = new PayloadData(new ArraySegment<byte>(buffer, 0, outputIndex));
219248
await m_connection.Session.SendReplyAsync(payload2, ioBehavior, cancellationToken).ConfigureAwait(false);
220249
}
250+
251+
// send final RowsCopied event (if it wasn't already sent)
252+
if (eventArgs is object && rowsCopied % NotifyAfter != 0)
253+
{
254+
eventArgs.RowsCopied = rowsCopied;
255+
RowsCopied!(this, eventArgs);
256+
}
221257
}
222258
finally
223259
{
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using System;
2+
3+
namespace MySql.Data.MySqlClient
4+
{
5+
public sealed class MySqlRowsCopiedEventArgs : EventArgs
6+
{
7+
/// <summary>
8+
/// Gets or sets a value that indicates whether the bulk copy operation should be aborted.
9+
/// </summary>
10+
public bool Abort { get; set; }
11+
12+
/// <summary>
13+
/// Gets a value that returns the number of rows copied during the current bulk copy operation.
14+
/// </summary>
15+
public long RowsCopied { get; internal set; }
16+
17+
internal MySqlRowsCopiedEventArgs()
18+
{
19+
}
20+
}
21+
22+
/// <summary>
23+
/// Represents the method that handles the <see cref="MySqlBulkCopy.RowsCopied"/> event of a <see cref="MySqlBulkCopy"/>.
24+
/// </summary>
25+
public delegate void MySqlRowsCopiedEventHandler(object sender, MySqlRowsCopiedEventArgs e);
26+
}

tests/SideBySide/BulkLoaderAsync.cs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Data;
33
using System.Data.Common;
44
using System.IO;
5+
using System.Linq;
56
using System.Threading.Tasks;
67
using MySql.Data.MySqlClient;
78
using Xunit;
@@ -501,6 +502,92 @@ public async Task BulkLoadDataTableWithTooLongData()
501502
};
502503
await Assert.ThrowsAsync<MySqlException>(async () => await bulkCopy.WriteToServerAsync(dataTable));
503504
}
505+
506+
[Theory]
507+
[InlineData(0, 15, 0, 0)]
508+
[InlineData(5, 15, 3, 15)]
509+
[InlineData(5, 16, 4, 16)]
510+
[InlineData(int.MaxValue, 15, 1, 15)]
511+
public async Task BulkCopyNotifyAfter(int notifyAfter, int rowCount, int expectedEventCount, int expectedRowsCopied)
512+
{
513+
using var connection = new MySqlConnection(GetLocalConnectionString());
514+
await connection.OpenAsync();
515+
using (var cmd = new MySqlCommand(@"drop table if exists bulk_copy_notify_after;
516+
create table bulk_copy_notify_after(value int);", connection))
517+
{
518+
await cmd.ExecuteNonQueryAsync();
519+
}
520+
521+
var bulkCopy = new MySqlBulkCopy(connection)
522+
{
523+
NotifyAfter = notifyAfter,
524+
DestinationTableName = "bulk_copy_notify_after",
525+
};
526+
int eventCount = 0;
527+
long rowsCopied = 0;
528+
bulkCopy.RowsCopied += (s, e) =>
529+
{
530+
eventCount++;
531+
rowsCopied = e.RowsCopied;
532+
};
533+
534+
var dataTable = new DataTable()
535+
{
536+
Columns = { new DataColumn("value", typeof(int)) },
537+
};
538+
foreach (var x in Enumerable.Range(1, rowCount))
539+
dataTable.Rows.Add(new object[] { x });
540+
541+
await bulkCopy.WriteToServerAsync(dataTable);
542+
Assert.Equal(expectedEventCount, eventCount);
543+
Assert.Equal(expectedRowsCopied, rowsCopied);
544+
}
545+
546+
[Theory]
547+
[InlineData(0, 40, 0, 0, 0, 40)]
548+
[InlineData(5, 40, 15, 3, 15, 0)]
549+
[InlineData(5, 40, 20, 4, 20, 16)]
550+
[InlineData(int.MaxValue, 20, 0, 1, 20, 20)]
551+
public async Task BulkCopyAbort(int notifyAfter, int rowCount, int abortAfter, int expectedEventCount, int expectedRowsCopied, long expectedCount)
552+
{
553+
using var connection = new MySqlConnection(GetLocalConnectionString());
554+
await connection.OpenAsync();
555+
using (var cmd = new MySqlCommand(@"drop table if exists bulk_copy_abort;
556+
create table bulk_copy_abort(value longtext);", connection))
557+
{
558+
await cmd.ExecuteNonQueryAsync();
559+
}
560+
561+
var bulkCopy = new MySqlBulkCopy(connection)
562+
{
563+
NotifyAfter = notifyAfter,
564+
DestinationTableName = "bulk_copy_abort",
565+
};
566+
int eventCount = 0;
567+
long rowsCopied = 0;
568+
bulkCopy.RowsCopied += (s, e) =>
569+
{
570+
eventCount++;
571+
rowsCopied = e.RowsCopied;
572+
if (e.RowsCopied >= abortAfter)
573+
e.Abort = true;
574+
};
575+
576+
var dataTable = new DataTable()
577+
{
578+
Columns = { new DataColumn("value", typeof(string)) },
579+
};
580+
var str = new string('a', 1_000_000);
581+
foreach (var x in Enumerable.Range(1, rowCount))
582+
dataTable.Rows.Add(new object[] { str });
583+
584+
await bulkCopy.WriteToServerAsync(dataTable);
585+
Assert.Equal(expectedEventCount, eventCount);
586+
Assert.Equal(expectedRowsCopied, rowsCopied);
587+
588+
using (var cmd = new MySqlCommand("select count(value) from bulk_copy_abort;", connection))
589+
Assert.Equal(expectedCount, await cmd.ExecuteScalarAsync());
590+
}
504591
#endif
505592

506593
[Fact]

tests/SideBySide/BulkLoaderSync.cs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Data;
33
using System.Data.Common;
44
using System.IO;
5+
using System.Linq;
56
using MySql.Data.MySqlClient;
67
using Xunit;
78
using Dapper;
@@ -697,6 +698,92 @@ public void BulkLoadDataTableWithTooLongString()
697698
};
698699
Assert.Throws<MySqlException>(() => bulkCopy.WriteToServer(dataTable));
699700
}
701+
702+
[Theory]
703+
[InlineData(0, 15, 0, 0)]
704+
[InlineData(5, 15, 3, 15)]
705+
[InlineData(5, 16, 4, 16)]
706+
[InlineData(int.MaxValue, 15, 1, 15)]
707+
public void BulkCopyNotifyAfter(int notifyAfter, int rowCount, int expectedEventCount, int expectedRowsCopied)
708+
{
709+
using var connection = new MySqlConnection(GetLocalConnectionString());
710+
connection.Open();
711+
using (var cmd = new MySqlCommand(@"drop table if exists bulk_copy_notify_after;
712+
create table bulk_copy_notify_after(value int);", connection))
713+
{
714+
cmd.ExecuteNonQuery();
715+
}
716+
717+
var bulkCopy = new MySqlBulkCopy(connection)
718+
{
719+
NotifyAfter = notifyAfter,
720+
DestinationTableName = "bulk_copy_notify_after",
721+
};
722+
int eventCount = 0;
723+
long rowsCopied = 0;
724+
bulkCopy.RowsCopied += (s, e) =>
725+
{
726+
eventCount++;
727+
rowsCopied = e.RowsCopied;
728+
};
729+
730+
var dataTable = new DataTable()
731+
{
732+
Columns = { new DataColumn("value", typeof(int)) },
733+
};
734+
foreach (var x in Enumerable.Range(1, rowCount))
735+
dataTable.Rows.Add(new object[] { x });
736+
737+
bulkCopy.WriteToServer(dataTable);
738+
Assert.Equal(expectedEventCount, eventCount);
739+
Assert.Equal(expectedRowsCopied, rowsCopied);
740+
}
741+
742+
[Theory]
743+
[InlineData(0, 40, 0, 0, 0, 40)]
744+
[InlineData(5, 40, 15, 3, 15, 0)]
745+
[InlineData(5, 40, 20, 4, 20, 16)]
746+
[InlineData(int.MaxValue, 20, 0, 1, 20, 20)]
747+
public void BulkCopyAbort(int notifyAfter, int rowCount, int abortAfter, int expectedEventCount, int expectedRowsCopied, long expectedCount)
748+
{
749+
using var connection = new MySqlConnection(GetLocalConnectionString());
750+
connection.Open();
751+
using (var cmd = new MySqlCommand(@"drop table if exists bulk_copy_abort;
752+
create table bulk_copy_abort(value longtext);", connection))
753+
{
754+
cmd.ExecuteNonQuery();
755+
}
756+
757+
var bulkCopy = new MySqlBulkCopy(connection)
758+
{
759+
NotifyAfter = notifyAfter,
760+
DestinationTableName = "bulk_copy_abort",
761+
};
762+
int eventCount = 0;
763+
long rowsCopied = 0;
764+
bulkCopy.RowsCopied += (s, e) =>
765+
{
766+
eventCount++;
767+
rowsCopied = e.RowsCopied;
768+
if (e.RowsCopied >= abortAfter)
769+
e.Abort = true;
770+
};
771+
772+
var dataTable = new DataTable()
773+
{
774+
Columns = { new DataColumn("value", typeof(string)) },
775+
};
776+
var str = new string('a', 1_000_000);
777+
foreach (var x in Enumerable.Range(1, rowCount))
778+
dataTable.Rows.Add(new object[] { str });
779+
780+
bulkCopy.WriteToServer(dataTable);
781+
Assert.Equal(expectedEventCount, eventCount);
782+
Assert.Equal(expectedRowsCopied, rowsCopied);
783+
784+
using (var cmd = new MySqlCommand("select count(value) from bulk_copy_abort;", connection))
785+
Assert.Equal(expectedCount, cmd.ExecuteScalar());
786+
}
700787
#endif
701788

702789
[Fact]

0 commit comments

Comments
 (0)