|
1 | | -public static partial class FreeSqlKingbaseESGlobalExtensions |
| 1 | +using FreeSql; |
| 2 | +using FreeSql.Internal.CommonProvider; |
| 3 | +using FreeSql.Internal.Model; |
| 4 | +using Kdbndp; |
| 5 | +using System; |
| 6 | +using System.Collections.Generic; |
| 7 | +using System.Data; |
| 8 | +using System.Linq; |
| 9 | +using System.Text; |
| 10 | + |
| 11 | +public static partial class FreeSqlKingbaseESGlobalExtensions |
2 | 12 | { |
3 | 13 |
|
4 | 14 | /// <summary> |
|
9 | 19 | /// <returns></returns> |
10 | 20 | public static string FormatKingbaseES(this string that, params object[] args) => _kingbaseesAdo.Addslashes(that, args); |
11 | 21 | static FreeSql.KingbaseES.KingbaseESAdo _kingbaseesAdo = new FreeSql.KingbaseES.KingbaseESAdo(); |
| 22 | + |
| 23 | + #region ExecuteKdbCopy |
| 24 | + /// <summary> |
| 25 | + /// 批量插入或更新(操作的字段数量超过 2000 时收益大)<para></para> |
| 26 | + /// 实现原理:使用 Copy 插入临时表,再执行 INSERT INTO t1 select * from #temp ON CONFLICT(""id"") DO UPDATE SET ... |
| 27 | + /// </summary> |
| 28 | + /// <typeparam name="T"></typeparam> |
| 29 | + /// <param name="that"></param> |
| 30 | + /// <returns></returns> |
| 31 | + public static int ExecuteKdbCopy<T>(this IInsertOrUpdate<T> that) where T : class |
| 32 | + { |
| 33 | + var upsert = that as InsertOrUpdateProvider<T>; |
| 34 | + if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return 0; |
| 35 | + var state = ExecuteKdbCopyState(upsert); |
| 36 | + return UpdateProvider.ExecuteBulkUpsert(upsert, state, insert => insert.ExecuteKdbCopy()); |
| 37 | + } |
| 38 | + static NativeTuple<string, string, string, string, string[]> ExecuteKdbCopyState<T>(InsertOrUpdateProvider<T> upsert) where T : class |
| 39 | + { |
| 40 | + if (upsert._source.Any() != true) return null; |
| 41 | + var _table = upsert._table; |
| 42 | + var _commonUtils = upsert._commonUtils; |
| 43 | + var updateTableName = upsert._tableRule?.Invoke(_table.DbName) ?? _table.DbName; |
| 44 | + var tempTableName = $"Temp_{Guid.NewGuid().ToString("N")}"; |
| 45 | + if (upsert._orm.CodeFirst.IsSyncStructureToLower) tempTableName = tempTableName.ToLower(); |
| 46 | + if (upsert._orm.CodeFirst.IsSyncStructureToUpper) tempTableName = tempTableName.ToUpper(); |
| 47 | + if (upsert._connection == null && upsert._orm.Ado.TransactionCurrentThread != null) |
| 48 | + upsert.WithTransaction(upsert._orm.Ado.TransactionCurrentThread); |
| 49 | + var sb = new StringBuilder().Append("CREATE TEMP TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( "); |
| 50 | + foreach (var col in _table.Columns.Values) |
| 51 | + { |
| 52 | + sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" ").Append(col.Attribute.DbType.Replace("NOT NULL", "")); |
| 53 | + sb.Append(","); |
| 54 | + } |
| 55 | + var sql1 = sb.Remove(sb.Length - 1, 1).Append("\r\n) WITH (OIDS=FALSE);").ToString(); |
| 56 | + sb.Clear(); |
| 57 | + try |
| 58 | + { |
| 59 | + upsert._sourceSql = $"select * from {tempTableName}"; |
| 60 | + var sql2 = upsert.ToSql(); |
| 61 | + var sql3 = $"DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}"; |
| 62 | + return NativeTuple.Create(sql1, sql2, sql3, tempTableName, _table.Columns.Values.Select(a => a.Attribute.Name).ToArray()); |
| 63 | + } |
| 64 | + finally |
| 65 | + { |
| 66 | + upsert._sourceSql = null; |
| 67 | + } |
| 68 | + } |
| 69 | + /// <summary> |
| 70 | + /// 批量更新(更新字段数量超过 2000 时收益大)<para></para> |
| 71 | + /// 实现原理:使用 Copy 插入临时表,再使用 UPDATE INNER JOIN 联表更新 |
| 72 | + /// </summary> |
| 73 | + /// <typeparam name="T"></typeparam> |
| 74 | + /// <param name="that"></param> |
| 75 | + /// <returns></returns> |
| 76 | + public static int ExecuteKdbCopy<T>(this IUpdate<T> that) where T : class |
| 77 | + { |
| 78 | + var update = that as UpdateProvider<T>; |
| 79 | + if (update._source.Any() != true || update._tempPrimarys.Any() == false) return 0; |
| 80 | + var state = ExecuteKdbCopyState(update); |
| 81 | + return UpdateProvider.ExecuteBulkUpdate(update, state, insert => insert.ExecuteKdbCopy()); |
| 82 | + } |
| 83 | + static NativeTuple<string, string, string, string, string[]> ExecuteKdbCopyState<T>(UpdateProvider<T> update) where T : class |
| 84 | + { |
| 85 | + if (update._source.Any() != true) return null; |
| 86 | + var _table = update._table; |
| 87 | + var _commonUtils = update._commonUtils; |
| 88 | + var updateTableName = update._tableRule?.Invoke(_table.DbName) ?? _table.DbName; |
| 89 | + var tempTableName = $"Temp_{Guid.NewGuid().ToString("N")}"; |
| 90 | + if (update._orm.CodeFirst.IsSyncStructureToLower) tempTableName = tempTableName.ToLower(); |
| 91 | + if (update._orm.CodeFirst.IsSyncStructureToUpper) tempTableName = tempTableName.ToUpper(); |
| 92 | + if (update._connection == null && update._orm.Ado.TransactionCurrentThread != null) |
| 93 | + update.WithTransaction(update._orm.Ado.TransactionCurrentThread); |
| 94 | + var sb = new StringBuilder().Append("CREATE TEMP TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( "); |
| 95 | + var setColumns = new List<string>(); |
| 96 | + var pkColumns = new List<string>(); |
| 97 | + foreach (var col in _table.Columns.Values) |
| 98 | + { |
| 99 | + if (update._tempPrimarys.Any(a => a.CsName == col.CsName)) pkColumns.Add(col.Attribute.Name); |
| 100 | + else if (col.Attribute.IsIdentity == false && col.Attribute.IsVersion == false && update._ignore.ContainsKey(col.Attribute.Name) == false) setColumns.Add(col.Attribute.Name); |
| 101 | + else continue; |
| 102 | + sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" ").Append(col.Attribute.DbType.Replace("NOT NULL", "")); |
| 103 | + sb.Append(","); |
| 104 | + } |
| 105 | + var sql1 = sb.Remove(sb.Length - 1, 1).Append("\r\n) WITH (OIDS=FALSE);").ToString(); |
| 106 | + |
| 107 | + sb.Clear().Append("UPDATE ").Append(_commonUtils.QuoteSqlName(updateTableName)).Append(" a ") |
| 108 | + .Append("\r\nSET \r\n ").Append(string.Join(", \r\n ", setColumns.Select(col => $"{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))) |
| 109 | + .Append("\r\nFROM ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" b ") |
| 110 | + .Append("\r\nWHERE ").Append(string.Join(" AND ", pkColumns.Select(col => $"a.{_commonUtils.QuoteSqlName(col)} = b.{_commonUtils.QuoteSqlName(col)}"))); |
| 111 | + var sql2 = sb.ToString(); |
| 112 | + sb.Clear(); |
| 113 | + var sql3 = $"DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}"; |
| 114 | + return NativeTuple.Create(sql1, sql2, sql3, tempTableName, pkColumns.Concat(setColumns).ToArray()); |
| 115 | + } |
| 116 | + |
| 117 | + /// <summary> |
| 118 | + /// KingbaseES COPY 批量导入功能,封装了 KdbndpConnection.BeginBinaryImport 方法<para></para> |
| 119 | + /// 使用 IgnoreColumns/InsertColumns 设置忽略/指定导入的列<para></para> |
| 120 | + /// 使用 WithConnection/WithTransaction 传入连接/事务对象<para></para> |
| 121 | + /// 提示:若本方法不能满足,请使用 IInsert<T>.ToDataTable 方法得到 DataTable 对象后,自行处理。<para></para> |
| 122 | + /// COPY 与 insert into t values(..),(..),(..) 性能测试参考:<para></para> |
| 123 | + /// 插入180000行,52列:10,090ms 与 46,756ms,10列:4,081ms 与 9,786ms<para></para> |
| 124 | + /// 插入10000行,52列:583ms 与 3,294ms,10列:167ms 与 568ms<para></para> |
| 125 | + /// 插入5000行,52列:337ms 与 2,269ms,10列:93ms 与 366ms<para></para> |
| 126 | + /// 插入2000行,52列:136ms 与 1,019ms,10列:39ms 与 157ms<para></para> |
| 127 | + /// 插入1000行,52列:88ms 与 374ms,10列:21ms 与 102ms<para></para> |
| 128 | + /// 插入500行,52列:61ms 与 209ms,10列:12ms 与 34ms<para></para> |
| 129 | + /// 插入100行,52列:30ms 与 51ms,10列:4ms 与 9ms<para></para> |
| 130 | + /// 插入50行,52列:25ms 与 37ms,10列:2ms 与 6ms<para></para> |
| 131 | + /// </summary> |
| 132 | + /// <typeparam name="T"></typeparam> |
| 133 | + /// <param name="that"></param> |
| 134 | + public static void ExecuteKdbCopy<T>(this IInsert<T> that) where T : class |
| 135 | + { |
| 136 | + var insert = that as FreeSql.KingbaseES.KingbaseESInsert<T>; |
| 137 | + if (insert == null) throw new Exception(CoreStrings.S_Features_Unique("ExecuteKdbCopy", "KingbaseES")); |
| 138 | + |
| 139 | + var dt = that.ToDataTable(); |
| 140 | + if (dt.Rows.Count == 0) return; |
| 141 | + |
| 142 | + Action<KdbndpConnection> binaryImport = conn => |
| 143 | + { |
| 144 | + var copyFromCommand = new StringBuilder().Append("COPY ").Append(insert.InternalCommonUtils.QuoteSqlName(dt.TableName)).Append("("); |
| 145 | + var colIndex = 0; |
| 146 | + foreach (DataColumn col in dt.Columns) |
| 147 | + { |
| 148 | + if (colIndex++ > 0) copyFromCommand.Append(", "); |
| 149 | + copyFromCommand.Append(insert.InternalCommonUtils.QuoteSqlName(col.ColumnName)); |
| 150 | + } |
| 151 | + copyFromCommand.Append(") FROM STDIN BINARY"); |
| 152 | + using (var writer = conn.BeginBinaryImport(copyFromCommand.ToString())) |
| 153 | + { |
| 154 | + foreach (DataRow item in dt.Rows) |
| 155 | + writer.WriteRow(item.ItemArray); |
| 156 | + writer.Complete(); |
| 157 | + } |
| 158 | + copyFromCommand.Clear(); |
| 159 | + }; |
| 160 | + |
| 161 | + try |
| 162 | + { |
| 163 | + if (insert.InternalConnection == null && insert.InternalTransaction == null) |
| 164 | + { |
| 165 | + using (var conn = insert.InternalOrm.Ado.MasterPool.Get()) |
| 166 | + { |
| 167 | + binaryImport(conn.Value as KdbndpConnection); |
| 168 | + } |
| 169 | + } |
| 170 | + else if (insert.InternalTransaction != null) |
| 171 | + { |
| 172 | + binaryImport(insert.InternalTransaction.Connection as KdbndpConnection); |
| 173 | + } |
| 174 | + else if (insert.InternalConnection != null) |
| 175 | + { |
| 176 | + var conn = insert.InternalConnection as KdbndpConnection; |
| 177 | + var isNotOpen = false; |
| 178 | + if (conn.State != System.Data.ConnectionState.Open) |
| 179 | + { |
| 180 | + isNotOpen = true; |
| 181 | + conn.Open(); |
| 182 | + } |
| 183 | + try |
| 184 | + { |
| 185 | + binaryImport(conn); |
| 186 | + } |
| 187 | + finally |
| 188 | + { |
| 189 | + if (isNotOpen) |
| 190 | + conn.Close(); |
| 191 | + } |
| 192 | + } |
| 193 | + else |
| 194 | + { |
| 195 | + throw new NotImplementedException($"ExecuteKdbCopy {CoreStrings.S_Not_Implemented_FeedBack}"); |
| 196 | + } |
| 197 | + } |
| 198 | + finally |
| 199 | + { |
| 200 | + dt.Clear(); |
| 201 | + } |
| 202 | + } |
| 203 | + #endregion |
12 | 204 | } |
0 commit comments