Skip to content

Commit 42b9d7b

Browse files
committed
- 增加 IInsertOrUpdate 高性能方法 ExecutePgCopy;
1 parent 09f3f1a commit 42b9d7b

File tree

2 files changed

+61
-9
lines changed

2 files changed

+61
-9
lines changed

Examples/base_entity/Program.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -545,15 +545,15 @@ static void Main(string[] args)
545545

546546
.UseConnectionString(FreeSql.DataType.SqlServer, "Data Source=.;Integrated Security=True;Initial Catalog=freesqlTest;Pooling=true;Max Pool Size=3;TrustServerCertificate=true")
547547

548-
//.UseConnectionString(FreeSql.DataType.PostgreSQL, "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=2")
548+
.UseConnectionString(FreeSql.DataType.PostgreSQL, "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=tedb;Pooling=true;Maximum Pool Size=2")
549549
//.UseConnectionString(FreeSql.DataType.PostgreSQL, "Host=192.168.164.10;Port=5432;Username=postgres;Password=123456;Database=toc;Pooling=true;Maximum Pool Size=2")
550-
//.UseNameConvert(FreeSql.Internal.NameConvertType.ToLower)
550+
.UseNameConvert(FreeSql.Internal.NameConvertType.ToLower)
551551

552552
//.UseConnectionString(FreeSql.DataType.Oracle, "user id=user1;password=123456;data source=//127.0.0.1:1521/XE;Pooling=true;Max Pool Size=2")
553553
//.UseNameConvert(FreeSql.Internal.NameConvertType.ToUpper)
554554

555-
.UseConnectionString(FreeSql.DataType.Dameng, "server=127.0.0.1;port=5236;user id=2user;password=123456789;database=2user;poolsize=5;min pool size=1")
556-
.UseNameConvert(FreeSql.Internal.NameConvertType.ToUpper)
555+
//.UseConnectionString(FreeSql.DataType.Dameng, "server=127.0.0.1;port=5236;user id=2user;password=123456789;database=2user;poolsize=5;min pool size=1")
556+
//.UseNameConvert(FreeSql.Internal.NameConvertType.ToUpper)
557557

558558
//.UseConnectionString(FreeSql.DataType.OdbcMySql, "Driver={MySQL ODBC 8.0 Unicode Driver};Server=127.0.0.1;Persist Security Info=False;Trusted_Connection=Yes;UID=root;PWD=root;DATABASE=cccddd_odbc;Charset=utf8;SslMode=none;Max pool size=2")
559559

@@ -578,15 +578,15 @@ static void Main(string[] args)
578578
BaseEntity.Initialization(fsql, () => _asyncUow.Value);
579579
#endregion
580580

581-
fsql.CodeFirst.GetTableByEntity(typeof(User1)).Columns.Values.ToList().ForEach(col =>
582-
{
583-
col.Comment = "";
584-
});
581+
//fsql.CodeFirst.GetTableByEntity(typeof(User1)).Columns.Values.ToList().ForEach(col =>
582+
//{
583+
// col.Comment = "";
584+
//});
585585
fsql.Insert(Enumerable.Range(0, 100).Select(a => new User1 { Id = Guid.NewGuid(), Nickname = $"nickname{a}", Username = $"username{a}", Description = $"desc{a}" }).ToArray()).ExecuteAffrows();
586586

587587
fsql.InsertOrUpdate<User1>()
588588
.SetSource(fsql.Select<User1>().ToList())
589-
.ExecuteDmBulkCopy();
589+
.ExecutePgCopy();
590590

591591
var updatejoin01 = fsql.Update<User1>()
592592
.Join(fsql.Select<UserGroup>(), (a, b) => a.GroupId == b.Id)

Providers/FreeSql.Provider.PostgreSQL/PostgreSQLExtensions.cs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,51 @@ public static partial class FreeSqlPostgreSQLGlobalExtensions
3737

3838
#region ExecutePgCopy
3939
/// <summary>
40+
/// 批量插入或更新(操作的字段数量超过 2000 时收益大)<para></para>
41+
/// 实现原理:使用 PgCopy 插入临时表,再执行 INSERT INTO t1 select * from #temp ON CONFLICT(""id"") DO UPDATE SET ...
42+
/// </summary>
43+
/// <typeparam name="T"></typeparam>
44+
/// <param name="that"></param>
45+
/// <returns></returns>
46+
public static int ExecutePgCopy<T>(this IInsertOrUpdate<T> that) where T : class
47+
{
48+
var upsert = that as InsertOrUpdateProvider<T>;
49+
if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return 0;
50+
var state = ExecutePgCopyState(upsert);
51+
return UpdateProvider.ExecuteBulkUpsert(upsert, state, insert => insert.ExecutePgCopy());
52+
}
53+
static NativeTuple<string, string, string, string, string[]> ExecutePgCopyState<T>(InsertOrUpdateProvider<T> upsert) where T : class
54+
{
55+
if (upsert._source.Any() != true) return null;
56+
var _table = upsert._table;
57+
var _commonUtils = upsert._commonUtils;
58+
var updateTableName = upsert._tableRule?.Invoke(_table.DbName) ?? _table.DbName;
59+
var tempTableName = $"Temp_{Guid.NewGuid().ToString("N")}";
60+
if (upsert._orm.CodeFirst.IsSyncStructureToLower) tempTableName = tempTableName.ToLower();
61+
if (upsert._orm.CodeFirst.IsSyncStructureToUpper) tempTableName = tempTableName.ToUpper();
62+
if (upsert._connection == null && upsert._orm.Ado.TransactionCurrentThread != null)
63+
upsert.WithTransaction(upsert._orm.Ado.TransactionCurrentThread);
64+
var sb = new StringBuilder().Append("CREATE TEMP TABLE ").Append(_commonUtils.QuoteSqlName(tempTableName)).Append(" ( ");
65+
foreach (var col in _table.Columns.Values)
66+
{
67+
sb.Append(" \r\n ").Append(_commonUtils.QuoteSqlName(col.Attribute.Name)).Append(" ").Append(col.Attribute.DbType.Replace("NOT NULL", ""));
68+
sb.Append(",");
69+
}
70+
var sql1 = sb.Remove(sb.Length - 1, 1).Append("\r\n) WITH (OIDS=FALSE);").ToString();
71+
sb.Clear();
72+
try
73+
{
74+
upsert._sourceSql = $"select * from {tempTableName}";
75+
var sql2 = upsert.ToSql();
76+
var sql3 = $"DROP TABLE {_commonUtils.QuoteSqlName(tempTableName)}";
77+
return NativeTuple.Create(sql1, sql2, sql3, tempTableName, _table.Columns.Values.Select(a => a.Attribute.Name).ToArray());
78+
}
79+
finally
80+
{
81+
upsert._sourceSql = null;
82+
}
83+
}
84+
/// <summary>
4085
/// 批量更新(更新字段数量超过 2000 时收益大)<para></para>
4186
/// 实现原理:使用 PgCopy 插入临时表,再使用 UPDATE INNER JOIN 联表更新
4287
/// </summary>
@@ -173,6 +218,13 @@ public static void ExecutePgCopy<T>(this IInsert<T> that) where T : class
173218

174219
#if net45
175220
#else
221+
public static Task<int> ExecutePgCopyAsync<T>(this IInsertOrUpdate<T> that, CancellationToken cancellationToken = default) where T : class
222+
{
223+
var upsert = that as UpdateProvider<T>;
224+
if (upsert._source.Any() != true || upsert._tempPrimarys.Any() == false) return Task.FromResult(0);
225+
var state = ExecutePgCopyState(upsert);
226+
return UpdateProvider.ExecuteBulkUpdateAsync(upsert, state, insert => insert.ExecutePgCopyAsync(cancellationToken));
227+
}
176228
public static Task<int> ExecutePgCopyAsync<T>(this IUpdate<T> that, CancellationToken cancellationToken = default) where T : class
177229
{
178230
var update = that as UpdateProvider<T>;

0 commit comments

Comments
 (0)