Skip to content

Commit cd3d7cd

Browse files
committed
Use "ambient" transaction in bulk loader. Fixes #300
1 parent 6bf912d commit cd3d7cd

File tree

3 files changed

+148
-3
lines changed

3 files changed

+148
-3
lines changed

src/MySqlConnector/MySqlClient/MySqlBulkLoader.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
using System.Text;
66
using System.Threading;
77
using System.Threading.Tasks;
8-
using MySql.Data.Serialization;
98
using MySql.Data.Protocol.Serialization;
109

1110
namespace MySql.Data.MySqlClient
@@ -23,6 +22,7 @@ public class MySqlBulkLoader
2322
public List<string> Columns { get; }
2423
public MySqlBulkLoaderConflictOption ConflictOption { get; set; }
2524
public MySqlConnection Connection { get; set; }
25+
public MySqlTransaction Transaction { get; set; }
2626
public char EscapeCharacter { get; set; }
2727
public List<string> Expressions { get; }
2828
public char FieldQuotationCharacter { get; set; }
@@ -60,6 +60,7 @@ public MySqlBulkLoader(MySqlConnection connection)
6060
ConflictOption = MySqlBulkLoaderConflictOption.None;
6161
Columns = new List<string>();
6262
Expressions = new List<string>();
63+
Transaction = connection.CurrentTransaction;
6364
}
6465

6566
private string BuildSqlCommand()
@@ -182,9 +183,9 @@ private async Task<int> LoadAsync(IOBehavior ioBehavior, CancellationToken cance
182183
try
183184
{
184185
var commandString = BuildSqlCommand();
185-
var cmd = new MySqlCommand(commandString, Connection)
186+
var cmd = new MySqlCommand(commandString, Connection, Transaction)
186187
{
187-
CommandTimeout = Timeout
188+
CommandTimeout = Timeout,
188189
};
189190
return await cmd.ExecuteNonQueryAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
190191
}

tests/SideBySide/BulkLoaderAsync.cs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,78 @@ public async Task BulkLoadLocalCsvFileNotFound()
208208
};
209209
}
210210

211+
[Fact]
212+
public async Task BulkLoadLocalCsvFileInTransactionWithCommit()
213+
{
214+
try
215+
{
216+
await m_database.Connection.OpenAsync();
217+
using (var transaction = m_database.Connection.BeginTransaction())
218+
{
219+
var bulkLoader = new MySqlBulkLoader(m_database.Connection)
220+
{
221+
FileName = AppConfig.MySqlBulkLoaderLocalCsvFile,
222+
TableName = m_testTable,
223+
CharacterSet = "UTF8",
224+
NumberOfLinesToSkip = 1,
225+
FieldTerminator = ",",
226+
FieldQuotationCharacter = '"',
227+
FieldQuotationOptional = true,
228+
Local = true,
229+
};
230+
bulkLoader.Expressions.Add("five = UNHEX(five)");
231+
bulkLoader.Columns.AddRange(new[] { "one", "two", "three", "four", "five" });
232+
233+
var rowCount = await bulkLoader.LoadAsync();
234+
Assert.Equal(20, rowCount);
235+
236+
transaction.Commit();
237+
}
238+
239+
Assert.Equal(20, await m_database.Connection.ExecuteScalarAsync<int>($@"select count(*) from {m_testTable};"));
240+
}
241+
finally
242+
{
243+
m_database.Connection.Close();
244+
}
245+
}
246+
247+
[Fact]
248+
public async Task BulkLoadLocalCsvFileInTransactionWithRollback()
249+
{
250+
try
251+
{
252+
await m_database.Connection.OpenAsync();
253+
using (var transaction = m_database.Connection.BeginTransaction())
254+
{
255+
var bulkLoader = new MySqlBulkLoader(m_database.Connection)
256+
{
257+
FileName = AppConfig.MySqlBulkLoaderLocalCsvFile,
258+
TableName = m_testTable,
259+
CharacterSet = "UTF8",
260+
NumberOfLinesToSkip = 1,
261+
FieldTerminator = ",",
262+
FieldQuotationCharacter = '"',
263+
FieldQuotationOptional = true,
264+
Local = true,
265+
};
266+
bulkLoader.Expressions.Add("five = UNHEX(five)");
267+
bulkLoader.Columns.AddRange(new[] { "one", "two", "three", "four", "five" });
268+
269+
var rowCount = await bulkLoader.LoadAsync();
270+
Assert.Equal(20, rowCount);
271+
272+
transaction.Rollback();
273+
}
274+
275+
Assert.Equal(0, await m_database.Connection.ExecuteScalarAsync<int>($@"select count(*) from {m_testTable};"));
276+
}
277+
finally
278+
{
279+
m_database.Connection.Close();
280+
}
281+
}
282+
211283
[Fact]
212284
public async Task BulkLoadMissingFileName()
213285
{

tests/SideBySide/BulkLoaderSync.cs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,78 @@ public void BulkLoadLocalCsvFileNotFound()
210210
};
211211
}
212212

213+
[Fact]
214+
public void BulkLoadLocalCsvFileInTransactionWithCommit()
215+
{
216+
try
217+
{
218+
m_database.Connection.Open();
219+
using (var transaction = m_database.Connection.BeginTransaction())
220+
{
221+
var bulkLoader = new MySqlBulkLoader(m_database.Connection)
222+
{
223+
FileName = AppConfig.MySqlBulkLoaderLocalCsvFile,
224+
TableName = m_testTable,
225+
CharacterSet = "UTF8",
226+
NumberOfLinesToSkip = 1,
227+
FieldTerminator = ",",
228+
FieldQuotationCharacter = '"',
229+
FieldQuotationOptional = true,
230+
Local = true,
231+
};
232+
bulkLoader.Expressions.Add("five = UNHEX(five)");
233+
bulkLoader.Columns.AddRange(new[] { "one", "two", "three", "four", "five" });
234+
235+
var rowCount = bulkLoader.Load();
236+
Assert.Equal(20, rowCount);
237+
238+
transaction.Commit();
239+
}
240+
241+
Assert.Equal(20, m_database.Connection.ExecuteScalar<int>($@"select count(*) from {m_testTable};"));
242+
}
243+
finally
244+
{
245+
m_database.Connection.Close();
246+
}
247+
}
248+
249+
[Fact]
250+
public void BulkLoadLocalCsvFileInTransactionWithRollback()
251+
{
252+
try
253+
{
254+
m_database.Connection.Open();
255+
using (var transaction = m_database.Connection.BeginTransaction())
256+
{
257+
var bulkLoader = new MySqlBulkLoader(m_database.Connection)
258+
{
259+
FileName = AppConfig.MySqlBulkLoaderLocalCsvFile,
260+
TableName = m_testTable,
261+
CharacterSet = "UTF8",
262+
NumberOfLinesToSkip = 1,
263+
FieldTerminator = ",",
264+
FieldQuotationCharacter = '"',
265+
FieldQuotationOptional = true,
266+
Local = true,
267+
};
268+
bulkLoader.Expressions.Add("five = UNHEX(five)");
269+
bulkLoader.Columns.AddRange(new[] { "one", "two", "three", "four", "five" });
270+
271+
var rowCount = bulkLoader.Load();
272+
Assert.Equal(20, rowCount);
273+
274+
transaction.Rollback();
275+
}
276+
277+
Assert.Equal(0, m_database.Connection.ExecuteScalar<int>($@"select count(*) from {m_testTable};"));
278+
}
279+
finally
280+
{
281+
m_database.Connection.Close();
282+
}
283+
}
284+
213285
[Fact]
214286
public void BulkLoadMissingFileName()
215287
{

0 commit comments

Comments
 (0)