|
18 | 18 | public static class FreeSqlMySqlConnectorGlobalExtensions |
19 | 19 | { |
20 | 20 | #region ExecuteMySqlBulkCopy |
21 | | - |
| 21 | + /// <summary> |
| 22 | + /// 批量插入或更新(操作的字段数量超过 2000 时收益大)<para></para> |
| 23 | + /// 实现原理:使用 MySqlBulkCopy 插入临时表,再执行 INSERT INTO t1 select * from #temp ON DUPLICATE KEY UPDATE ... |
| 24 | + /// </summary> |
| 25 | + /// <typeparam name="T"></typeparam> |
| 26 | + /// <param name="that"></param> |
| 27 | + /// <param name="bulkCopyTimeout"></param> |
| 28 | + /// <returns></returns> |
| 29 | + public static int ExecuteMySqlBulkCopy<T>(this IInsertOrUpdate<T> that, int? bulkCopyTimeout = null) where T : class |
| 30 | + { |
| 31 | + var upsert = that as InsertOrUpdateProvider<T>; |
| 32 | + if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return 0; |
| 33 | + var state = ExecuteMySqlBulkCopyState(upsert); |
| 34 | + return UpdateProvider.ExecuteBulkUpsert(upsert, state, insert => insert.ExecuteMySqlBulkCopy(bulkCopyTimeout)); |
| 35 | + } |
| 36 | + static NativeTuple<string, string, string, string, string[]> ExecuteMySqlBulkCopyState<T>(InsertOrUpdateProvider<T> upsert) where T : class |
| 37 | + { |
| 38 | + if (upsert._source.Any() != true) return null; |
| 39 | + var _table = upsert._table; |
| 40 | + var _commonUtils = upsert._commonUtils; |
| 41 | + var updateTableName = upsert._tableRule?.Invoke(_table.DbName) ?? _table.DbName; |
| 42 | + var tempTableName = $"Temp_{Guid.NewGuid().ToString("N")}"; |
| 43 | + if (upsert._orm.CodeFirst.IsSyncStructureToLower) tempTableName = tempTableName.ToLower(); |
| 44 | + if (upsert._orm.CodeFirst.IsSyncStructureToUpper) tempTableName = tempTableName.ToUpper(); |
| 45 | + if (upsert._connection == null && upsert._orm.Ado.TransactionCurrentThread != null) |
| 46 | + upsert.WithTransaction(upsert._orm.Ado.TransactionCurrentThread); |
| 47 | + var sb = new StringBuilder().Append("CREATE TEMPORARY TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( "); |
| 48 | + foreach (var col in _table.Columns.Values) |
| 49 | + { |
| 50 | + sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" ").Append(col.Attribute.DbType.Replace("NOT NULL", "")); |
| 51 | + sb.Append(","); |
| 52 | + } |
| 53 | + var sql1 = sb.Remove(sb.Length - 1, 1).Append(" \r\n) Engine=InnoDB;").ToString(); |
| 54 | + try |
| 55 | + { |
| 56 | + upsert._sourceSql = $"select __**__ from {tempTableName}"; |
| 57 | + var sql2 = upsert.ToSql(); |
| 58 | + if (string.IsNullOrWhiteSpace(sql2) == false) |
| 59 | + { |
| 60 | + var field = sql2.Substring(sql2.IndexOf("`(") + 2); |
| 61 | + field = field.Remove(field.IndexOf(upsert._sourceSql)).TrimEnd().TrimEnd(')'); |
| 62 | + sql2 = sql2.Replace(upsert._sourceSql, $"select {field} from {tempTableName}"); |
| 63 | + } |
| 64 | + var sql3 = $"DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}"; |
| 65 | + return NativeTuple.Create(sql1, sql2, sql3, tempTableName, _table.Columns.Values.Select(a => a.Attribute.Name).ToArray()); |
| 66 | + } |
| 67 | + finally |
| 68 | + { |
| 69 | + upsert._sourceSql = null; |
| 70 | + } |
| 71 | + } |
22 | 72 | /// <summary> |
23 | 73 | /// 批量更新(更新字段数量超过 2000 时收益大)<para></para> |
24 | 74 | /// 实现原理:使用 MySqlBulkCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新 |
@@ -147,6 +197,13 @@ public static void ExecuteMySqlBulkCopy<T>(this IInsert<T> that, int? bulkCopyTi |
147 | 197 | } |
148 | 198 | #if net40 |
149 | 199 | #else |
| 200 | + public static Task<int> ExecuteMySqlBulkCopyAsync<T>(this IInsertOrUpdate<T> that, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class |
| 201 | + { |
| 202 | + var upsert = that as UpdateProvider<T>; |
| 203 | + if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return Task.FromResult(0); |
| 204 | + var state = ExecuteMySqlBulkCopyState(upsert); |
| 205 | + return UpdateProvider.ExecuteBulkUpdateAsync(upsert, state, insert => insert.ExecuteMySqlBulkCopyAsync(bulkCopyTimeout, cancellationToken)); |
| 206 | + } |
150 | 207 | public static Task<int> ExecuteMySqlBulkCopyAsync<T>(this IUpdate<T> that, int? bulkCopyTimeout = null, CancellationToken cancellationToken = default) where T : class |
151 | 208 | { |
152 | 209 | var update = that as UpdateProvider<T>; |
|
0 commit comments