Skip to content

Commit 32ea064

Browse files
committed
- 添加 InsertOrUpdate 高性能方法 ExecuteSqlBulkCopy;
1 parent b2f484d commit 32ea064

File tree

4 files changed

+105
-15
lines changed

4 files changed

+105
-15
lines changed

Examples/base_entity/Program.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,10 @@ static void Main(string[] args)
578578
BaseEntity.Initialization(fsql, () => _asyncUow.Value);
579579
#endregion
580580

581+
fsql.InsertOrUpdate<User1>()
582+
.SetSource(fsql.Select<User1>().ToList())
583+
.ExecuteSqlBulkCopy();
584+
581585
var updatejoin01 = fsql.Update<User1>()
582586
.Join(fsql.Select<UserGroup>(), (a, b) => a.GroupId == b.Id)
583587
.Set((a, b) => a.Nickname == b.GroupName)

FreeSql.DbContext/FreeSql.DbContext.xml

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

FreeSql/Internal/CommonProvider/UpdateProvider.cs

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,18 @@ public abstract partial class UpdateProvider
4141
public bool _isAutoSyncStructure;
4242

4343

44-
public static int ExecuteBulkUpdate<T1>(UpdateProvider<T1> update, NativeTuple<string, string, string, string, string[]> state, Action<IInsert<T1>> funcBulkCopy) where T1 : class
44+
public static int ExecuteBulkUpdate<T1>(UpdateProvider<T1> update, NativeTuple<string, string, string, string, string[]> state, Action<IInsert<T1>> funcBulkCopy) where T1 : class =>
45+
ExecuteBulkCommand(update._source, update._tempPrimarys, update._orm, update._connection, update._transaction, update._table, state, funcBulkCopy);
46+
public static int ExecuteBulkUpsert<T1>(InsertOrUpdateProvider<T1> upsert, NativeTuple<string, string, string, string, string[]> state, Action<IInsert<T1>> funcBulkCopy) where T1 : class =>
47+
ExecuteBulkCommand(upsert._source, upsert._tempPrimarys, upsert._orm, upsert._connection, upsert._transaction, upsert._table, state, funcBulkCopy);
48+
49+
public static int ExecuteBulkCommand<T1>(List<T1> _source, ColumnInfo[] _tempPrimarys, IFreeSql _orm, DbConnection _connection, DbTransaction _transaction, TableInfo _table,
50+
NativeTuple<string, string, string, string, string[]> state, Action<IInsert<T1>> funcBulkCopy) where T1 : class
4551
{
46-
if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0;
47-
var fsql = update._orm;
48-
var connection = update._connection;
49-
var transaction = update._transaction;
52+
if (_source.Any() != true || _tempPrimarys.Any() == false) return 0;
53+
var fsql = _orm;
54+
var connection = _connection;
55+
var transaction = _transaction;
5056

5157
Object<DbConnection> poolConn = null;
5258
if (connection == null)
@@ -61,9 +67,9 @@ public static int ExecuteBulkUpdate<T1>(UpdateProvider<T1> update, NativeTuple<s
6167
try
6268
{
6369
var insert = fsql.Insert<T1>();
64-
(insert as InsertProvider<T1>)._source.AddRange(update._source); //不能直接 AppendData,防止触发 Aop.AuditValue
70+
(insert as InsertProvider<T1>)._source.AddRange(_source); //不能直接 AppendData,防止触发 Aop.AuditValue
6571
insert
66-
.AsType(update._table.Type)
72+
.AsType(_table.Type)
6773
.WithConnection(connection)
6874
.WithTransaction(transaction)
6975
.InsertIdentity()
@@ -96,12 +102,18 @@ public static int ExecuteBulkUpdate<T1>(UpdateProvider<T1> update, NativeTuple<s
96102
}
97103
#if net40
98104
#else
99-
async public static Task<int> ExecuteBulkUpdateAsync<T1>(UpdateProvider<T1> update, NativeTuple<string, string, string, string, string[]> state, Func<IInsert<T1>, Task> funcBulkCopy) where T1 : class
105+
public static Task<int> ExecuteBulkUpdateAsync<T1>(UpdateProvider<T1> update, NativeTuple<string, string, string, string, string[]> state, Func<IInsert<T1>, Task> funcBulkCopy) where T1 : class =>
106+
ExecuteBulkCommandAsync(update._source, update._tempPrimarys, update._orm, update._connection, update._transaction, update._table, state, funcBulkCopy);
107+
public static Task<int> ExecuteBulkUpsertAsync<T1>(InsertOrUpdateProvider<T1> upsert, NativeTuple<string, string, string, string, string[]> state, Func<IInsert<T1>, Task> funcBulkCopy) where T1 : class =>
108+
ExecuteBulkCommandAsync(upsert._source, upsert._tempPrimarys, upsert._orm, upsert._connection, upsert._transaction, upsert._table, state, funcBulkCopy);
109+
110+
async public static Task<int> ExecuteBulkCommandAsync<T1>(List<T1> _source, ColumnInfo[] _tempPrimarys, IFreeSql _orm, DbConnection _connection, DbTransaction _transaction, TableInfo _table,
111+
NativeTuple<string, string, string, string, string[]> state, Func<IInsert<T1>, Task> funcBulkCopy) where T1 : class
100112
{
101-
if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0;
102-
var fsql = update._orm;
103-
var connection = update._connection;
104-
var transaction = update._transaction;
113+
if (_source.Any() != true || _tempPrimarys.Any() == false) return 0;
114+
var fsql = _orm;
115+
var connection = _connection;
116+
var transaction = _transaction;
105117

106118
Object<DbConnection> poolConn = null;
107119
if (connection == null)
@@ -116,16 +128,25 @@ async public static Task<int> ExecuteBulkUpdateAsync<T1>(UpdateProvider<T1> upda
116128
try
117129
{
118130
var insert = fsql.Insert<T1>();
119-
(insert as InsertProvider<T1>)._source.AddRange(update._source); //不能直接 AppendData,防止触发 Aop.AuditValue
131+
(insert as InsertProvider<T1>)._source.AddRange(_source); //不能直接 AppendData,防止触发 Aop.AuditValue
120132
insert
121-
.AsType(update._table.Type)
133+
.AsType(_table.Type)
122134
.WithConnection(connection)
123135
.WithTransaction(transaction)
124136
.InsertIdentity()
125137
.InsertColumns(state.Item5)
126138
.AsTable(state.Item4);
127139
(insert as InsertProvider)._isAutoSyncStructure = false;
128140
await funcBulkCopy(insert);
141+
switch (fsql.Ado.DataType)
142+
{
143+
case DataType.Oracle:
144+
case DataType.OdbcOracle:
145+
case DataType.CustomOracle:
146+
case DataType.Dameng:
147+
case DataType.OdbcDameng:
148+
return await fsql.Ado.CommandFluent(state.Item2).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync();
149+
}
129150
var affrows = await fsql.Ado.CommandFluent(state.Item2 + ";\r\n" + state.Item3).WithConnection(connection).WithTransaction(transaction).ExecuteNonQueryAsync();
130151
droped = true;
131152
return affrows;

Providers/FreeSql.Provider.SqlServer/SqlServerExtensions.cs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,55 @@ public static IFreeSql SetGlobalSelectWithLock(this IFreeSql that, SqlServerLock
120120

121121
#region ExecuteSqlBulkCopy
122122
/// <summary>
123+
/// 批量插入或更新(操作的字段数量超过 2000 时收益大)<para></para>
124+
/// 实现原理:使用 SqlBulkCopy 插入临时表,再执行 MERGE INTO t1 using (select * from #temp) ...
125+
/// </summary>
126+
/// <typeparam name="T"></typeparam>
127+
/// <param name="that"></param>
128+
/// <param name="copyOptions"></param>
129+
/// <param name="batchSize"></param>
130+
/// <param name="bulkCopyTimeout"></param>
131+
/// <returns></returns>
132+
public static int ExecuteSqlBulkCopy<T>(this IInsertOrUpdate<T> that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null) where T : class
133+
{
134+
var upsert = that as InsertOrUpdateProvider<T>;
135+
if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return 0;
136+
var state = ExecuteSqlBulkCopyState(upsert);
137+
return UpdateProvider.ExecuteBulkUpsert(upsert, state, insert => insert.ExecuteSqlBulkCopy(copyOptions, batchSize, bulkCopyTimeout));
138+
139+
}
140+
static NativeTuple<string, string, string, string, string[]> ExecuteSqlBulkCopyState<T>(InsertOrUpdateProvider<T> upsert) where T : class
141+
{
142+
if (upsert._source.Any() != true) return null;
143+
var _table = upsert._table;
144+
var _commonUtils = upsert._commonUtils;
145+
var updateTableName = upsert._tableRule?.Invoke(_table.DbName) ?? _table.DbName;
146+
var tempTableName = $"#Temp_{updateTableName}";
147+
if (upsert._orm.CodeFirst.IsSyncStructureToLower) tempTableName = tempTableName.ToLower();
148+
if (upsert._orm.CodeFirst.IsSyncStructureToUpper) tempTableName = tempTableName.ToUpper();
149+
if (upsert._connection == null && upsert._orm.Ado.TransactionCurrentThread != null)
150+
upsert.WithTransaction(upsert._orm.Ado.TransactionCurrentThread);
151+
var setColumns = new List<string>();
152+
var pkColumns = new List<string>();
153+
foreach (var col in _table.Columns.Values)
154+
{
155+
if (upsert._tempPrimarys.Any(a => a.CsName == col.CsName)) pkColumns.Add(col.Attribute.Name);
156+
else if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && upsert._updateIgnore.ContainsKey(col.Attribute.Name) == false) setColumns.Add(col.Attribute.Name);
157+
}
158+
var sql1 = $"SELECT {string.Join(", ", pkColumns.Select(a => _commonUtils.QuoteSqlName(a)))}, {string.Join(", ", setColumns.Select(a => _commonUtils.QuoteSqlName(a)))} INTO {tempTableName} FROM {_commonUtils.QuoteSqlName(updateTableName)} WHERE 1=2";
159+
try
160+
{
161+
upsert._sourceSql = $"select * from {tempTableName}";
162+
var sql2 = upsert.ToSql();
163+
var sql3 = $"DROP TABLE {tempTableName}";
164+
return NativeTuple.Create(sql1, sql2, sql3, tempTableName, pkColumns.Concat(setColumns).ToArray());
165+
}
166+
finally
167+
{
168+
upsert._sourceSql = null;
169+
}
170+
}
171+
/// <summary>
123172
/// 批量更新(更新字段数量超过 2000 时收益大)<para></para>
124173
/// 实现原理:使用 SqlBulkCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新
125174
/// </summary>
@@ -154,7 +203,7 @@ static NativeTuple<string, string, string, string, string[]> ExecuteSqlBulkCopyS
154203
if (update._tempPrimarys.Any(a => a.CsName == col.CsName)) pkColumns.Add(col.Attribute.Name);
155204
else if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && update._ignore.ContainsKey(col.Attribute.Name) == false) setColumns.Add(col.Attribute.Name);
156205
}
157-
var sql1 = $"SELECT {string.Join(", ", pkColumns)}, {string.Join(", ", setColumns)} INTO {tempTableName} FROM {_commonUtils.QuoteSqlName(updateTableName)} WHERE 1=2";
206+
var sql1 = $"SELECT {string.Join(", ", pkColumns.Select(a => _commonUtils.QuoteSqlName(a)))}, {string.Join(", ", setColumns.Select(a => _commonUtils.QuoteSqlName(a)))} INTO {tempTableName} FROM {_commonUtils.QuoteSqlName(updateTableName)} WHERE 1=2";
158207
var sb = new StringBuilder().Append("UPDATE ").Append(" a SET \r\n ").Append(string.Join(", \r\n ", setColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}")));
159208
sb.Append(" \r\nFROM ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ")
160209
.Append(" \r\nINNER JOIN ").Append(tempTableName).Append(" b ON ").Append(string.Join(" AND ", pkColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}")));
@@ -263,6 +312,13 @@ public static void ExecuteSqlBulkCopy<T>(this IInsert<T> that, SqlBulkCopyOption
263312
}
264313
#if net40
265314
#else
315+
public static Task<int> ExecuteSqlBulkCopyAsync<T>(this IInsertOrUpdate<T> that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class
316+
{
317+
var upsert = that as InsertOrUpdateProvider<T>;
318+
if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return Task.FromResult(0);
319+
var state = ExecuteSqlBulkCopyState(upsert);
320+
return UpdateProvider.ExecuteBulkUpsertAsync(upsert, state, insert => insert.ExecuteSqlBulkCopyAsync(copyOptions, batchSize, bulkCopyTimeout, cancellationToken));
321+
}
266322
public static Task<int> ExecuteSqlBulkCopyAsync<T>(this IUpdate<T> that, SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, int? batchSize = null, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class
267323
{
268324
var update = that as UpdateProvider<T>;

0 commit comments

Comments
 (0)