Skip to content

Commit 0128b78

Browse files
committed
修复mysql数据库MySqlBulkLoader批量插入在linux上数据丢失的问题
1 parent 4bd3dc3 commit 0128b78

File tree

2 files changed

+48
-30
lines changed

2 files changed

+48
-30
lines changed

Vue.Net/VOL.Core/Dapper/SqlDapper.cs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Collections.Generic;
77
using System.Data;
88
using System.Data.SqlClient;
9+
using System.IO;
910
using System.Linq;
1011
using System.Linq.Expressions;
1112
using System.Text;
@@ -110,16 +111,16 @@ public T QueryFirst<T>(string cmd, object param, CommandType? commandType = null
110111
return QueryList<T>(cmd, param, commandType: commandType ?? CommandType.Text, beginTransaction: beginTransaction).FirstOrDefault();
111112
}
112113

113-
public List<dynamic> QueryDynamicList(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false)
114+
public List<dynamic> QueryDynamicList(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false)
114115
{
115116
return Execute((conn, dbTransaction) =>
116117
{
117118
return conn.Query<dynamic>(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text).ToList();
118119
}, beginTransaction);
119120
}
120-
public dynamic QueryDynamicFirst(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false)
121+
public dynamic QueryDynamicFirst(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false)
121122
{
122-
return QueryList<dynamic>(cmd, param, commandType: commandType ?? CommandType.Text, beginTransaction: beginTransaction).FirstOrDefault();
123+
return QueryList<dynamic>(cmd, param, commandType: commandType ?? CommandType.Text, beginTransaction: beginTransaction).FirstOrDefault();
123124
}
124125

125126
public object ExecuteScalar(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false)
@@ -282,7 +283,7 @@ public int AddRange<T>(IEnumerable<T> entities, Expression<Func<T, object>> addF
282283
return Execute<int>((conn, dbTransaction) =>
283284
{
284285
//todo pgsql待实现
285-
return conn.Execute(sql, (DBType.Name == DbCurrentType.MySql.ToString() || DBType.Name == DbCurrentType.PgSql.ToString()) ? entities.ToList() : null,dbTransaction);
286+
return conn.Execute(sql, (DBType.Name == DbCurrentType.MySql.ToString() || DBType.Name == DbCurrentType.PgSql.ToString()) ? entities.ToList() : null, dbTransaction);
286287
}, beginTransaction);
287288
}
288289

@@ -433,6 +434,7 @@ public int BulkInsert(DataTable table, string tableName, SqlBulkCopyOptions? sql
433434

434435
/// <summary>
435436
///大批量数据插入,返回成功插入行数
437+
////************(网上的示例在linux上运行批量插入就是巨坑,会丢数据,至于为什么,见MySqlBulkLoader源码)***************/
436438
/// </summary>
437439
/// <param name="connectionString">数据库连接字符串</param>
438440
/// <param name="table">数据表</param>
@@ -442,11 +444,13 @@ private int MySqlBulkInsert(DataTable table, string tableName, string fileName =
442444
if (table.Rows.Count == 0)
443445
return 0;
444446
tmpPath = tmpPath ?? FileHelper.GetCurrentDownLoadPath();
445-
fileName = fileName ?? $"{DateTime.Now.ToString("yyyyMMddHHmmss")}.csv";
447+
// fileName = fileName ?? $"{DateTime.Now.ToString("yyyyMMddHHmmss")}.csv";
446448
int insertCount = 0;
447449
string csv = DataTableToCsv(table);
448-
FileHelper.WriteFile(tmpPath, fileName, csv);
449-
string path = tmpPath + fileName;
450+
// FileHelper.WriteFile(tmpPath, fileName, csv);
451+
// string path = tmpPath + fileName;
452+
string text = $"当前行:{table.Rows.Count}";
453+
MemoryStream stream = null;
450454
try
451455
{
452456
if (Connection.State == ConnectionState.Closed)
@@ -455,17 +459,21 @@ private int MySqlBulkInsert(DataTable table, string tableName, string fileName =
455459
{
456460
MySqlBulkLoader bulk = new MySqlBulkLoader(Connection as MySqlConnection)
457461
{
458-
FieldTerminator = ",",
459-
FieldQuotationCharacter = '"',
460-
EscapeCharacter = '"',
461-
LineTerminator = "\r\n",
462-
FileName = path.ReplacePath(),
463-
NumberOfLinesToSkip = 0,
462+
LineTerminator = "\n",
464463
TableName = tableName,
465464
CharacterSet = "UTF8"
466465
};
466+
var array = Encoding.UTF8.GetBytes(csv);
467+
stream = new MemoryStream(array);
468+
// StreamReader reader = new StreamReader(stream);
469+
bulk.SourceStream = stream; //File.OpenRead(fileName);
467470
bulk.Columns.AddRange(table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList());
468471
insertCount = bulk.Load();
472+
//text = text + $",实际写入行:{insertCount}";
473+
//fileName = fileName ?? $"{DateTime.Now.ToString("yyyyMMddHHmmss")}.text";
474+
//FileHelper.WriteFile(tmpPath, fileName, text);
475+
//Console.WriteLine(text);
476+
//Console.WriteLine(insertCount);
469477
tran.Commit();
470478
}
471479
}
@@ -477,6 +485,7 @@ private int MySqlBulkInsert(DataTable table, string tableName, string fileName =
477485
{
478486
Connection?.Dispose();
479487
Connection?.Close();
488+
stream?.Dispose();
480489
}
481490
return insertCount;
482491
// File.Delete(path);
@@ -501,7 +510,7 @@ private string DataTableToCsv(DataTable table)
501510
for (int i = 0; i < table.Columns.Count; i++)
502511
{
503512
colum = table.Columns[i];
504-
if (i != 0) sb.Append(",");
513+
if (i != 0) sb.Append("\t");
505514
if (colum.DataType == typeString && row[colum].ToString().Contains(","))
506515
{
507516
sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\"");
@@ -514,7 +523,7 @@ private string DataTableToCsv(DataTable table)
514523
}
515524
else sb.Append(row[colum].ToString());
516525
}
517-
sb.AppendLine();
526+
sb.Append("\n");
518527
}
519528

520529
return sb.ToString();

开发版dev/Vue.NetCore/Vue.Net/VOL.Core/Dapper/SqlDapper.cs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Collections.Generic;
77
using System.Data;
88
using System.Data.SqlClient;
9+
using System.IO;
910
using System.Linq;
1011
using System.Linq.Expressions;
1112
using System.Text;
@@ -110,16 +111,16 @@ public T QueryFirst<T>(string cmd, object param, CommandType? commandType = null
110111
return QueryList<T>(cmd, param, commandType: commandType ?? CommandType.Text, beginTransaction: beginTransaction).FirstOrDefault();
111112
}
112113

113-
public List<dynamic> QueryDynamicList(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false)
114+
public List<dynamic> QueryDynamicList(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false)
114115
{
115116
return Execute((conn, dbTransaction) =>
116117
{
117118
return conn.Query<dynamic>(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text).ToList();
118119
}, beginTransaction);
119120
}
120-
public dynamic QueryDynamicFirst(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false)
121+
public dynamic QueryDynamicFirst(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false)
121122
{
122-
return QueryList<dynamic>(cmd, param, commandType: commandType ?? CommandType.Text, beginTransaction: beginTransaction).FirstOrDefault();
123+
return QueryList<dynamic>(cmd, param, commandType: commandType ?? CommandType.Text, beginTransaction: beginTransaction).FirstOrDefault();
123124
}
124125

125126
public object ExecuteScalar(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false)
@@ -282,7 +283,7 @@ public int AddRange<T>(IEnumerable<T> entities, Expression<Func<T, object>> addF
282283
return Execute<int>((conn, dbTransaction) =>
283284
{
284285
//todo pgsql待实现
285-
return conn.Execute(sql, (DBType.Name == DbCurrentType.MySql.ToString() || DBType.Name == DbCurrentType.PgSql.ToString()) ? entities.ToList() : null,dbTransaction);
286+
return conn.Execute(sql, (DBType.Name == DbCurrentType.MySql.ToString() || DBType.Name == DbCurrentType.PgSql.ToString()) ? entities.ToList() : null, dbTransaction);
286287
}, beginTransaction);
287288
}
288289

@@ -433,6 +434,7 @@ public int BulkInsert(DataTable table, string tableName, SqlBulkCopyOptions? sql
433434

434435
/// <summary>
435436
///大批量数据插入,返回成功插入行数
437+
////************(网上的示例在linux上运行批量插入就是巨坑,会丢数据,至于为什么,见MySqlBulkLoader源码)***************/
436438
/// </summary>
437439
/// <param name="connectionString">数据库连接字符串</param>
438440
/// <param name="table">数据表</param>
@@ -442,11 +444,13 @@ private int MySqlBulkInsert(DataTable table, string tableName, string fileName =
442444
if (table.Rows.Count == 0)
443445
return 0;
444446
tmpPath = tmpPath ?? FileHelper.GetCurrentDownLoadPath();
445-
fileName = fileName ?? $"{DateTime.Now.ToString("yyyyMMddHHmmss")}.csv";
447+
// fileName = fileName ?? $"{DateTime.Now.ToString("yyyyMMddHHmmss")}.csv";
446448
int insertCount = 0;
447449
string csv = DataTableToCsv(table);
448-
FileHelper.WriteFile(tmpPath, fileName, csv);
449-
string path = tmpPath + fileName;
450+
// FileHelper.WriteFile(tmpPath, fileName, csv);
451+
// string path = tmpPath + fileName;
452+
string text = $"当前行:{table.Rows.Count}";
453+
MemoryStream stream = null;
450454
try
451455
{
452456
if (Connection.State == ConnectionState.Closed)
@@ -455,17 +459,21 @@ private int MySqlBulkInsert(DataTable table, string tableName, string fileName =
455459
{
456460
MySqlBulkLoader bulk = new MySqlBulkLoader(Connection as MySqlConnection)
457461
{
458-
FieldTerminator = ",",
459-
FieldQuotationCharacter = '"',
460-
EscapeCharacter = '"',
461-
LineTerminator = "\r\n",
462-
FileName = path.ReplacePath(),
463-
NumberOfLinesToSkip = 0,
462+
LineTerminator = "\n",
464463
TableName = tableName,
465464
CharacterSet = "UTF8"
466465
};
466+
var array = Encoding.UTF8.GetBytes(csv);
467+
stream = new MemoryStream(array);
468+
// StreamReader reader = new StreamReader(stream);
469+
bulk.SourceStream = stream; //File.OpenRead(fileName);
467470
bulk.Columns.AddRange(table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList());
468471
insertCount = bulk.Load();
472+
//text = text + $",实际写入行:{insertCount}";
473+
//fileName = fileName ?? $"{DateTime.Now.ToString("yyyyMMddHHmmss")}.text";
474+
//FileHelper.WriteFile(tmpPath, fileName, text);
475+
//Console.WriteLine(text);
476+
//Console.WriteLine(insertCount);
469477
tran.Commit();
470478
}
471479
}
@@ -477,6 +485,7 @@ private int MySqlBulkInsert(DataTable table, string tableName, string fileName =
477485
{
478486
Connection?.Dispose();
479487
Connection?.Close();
488+
stream?.Dispose();
480489
}
481490
return insertCount;
482491
// File.Delete(path);
@@ -501,7 +510,7 @@ private string DataTableToCsv(DataTable table)
501510
for (int i = 0; i < table.Columns.Count; i++)
502511
{
503512
colum = table.Columns[i];
504-
if (i != 0) sb.Append(",");
513+
if (i != 0) sb.Append("\t");
505514
if (colum.DataType == typeString && row[colum].ToString().Contains(","))
506515
{
507516
sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\"");
@@ -514,7 +523,7 @@ private string DataTableToCsv(DataTable table)
514523
}
515524
else sb.Append(row[colum].ToString());
516525
}
517-
sb.AppendLine();
526+
sb.Append("\n");
518527
}
519528

520529
return sb.ToString();

0 commit comments

Comments
 (0)