File tree Expand file tree Collapse file tree 5 files changed +61
-8
lines changed
Expand file tree Collapse file tree 5 files changed +61
-8
lines changed Original file line number Diff line number Diff line change @@ -15,6 +15,9 @@ namespace XCode.Cache;
1515
1616 /// <summary>表名</summary>
1717 public String TableName { get ; set ; } = null ! ;
18+
19+ /// <summary>当前实体类专用的惰性消费者。让缓存更新异步串行执行,避免并行阻塞数据库</summary>
20+ protected LazyConsumer Consumer { get ; } = new ( ) ;
1821 #endregion
1922
2023 /// <summary>调用委托方法前设置连接名和表名,调用后还原</summary>
Original file line number Diff line number Diff line change @@ -159,7 +159,7 @@ void UpdateCacheAsync(String reason)
159159
160160 reason = $ "异步更新缓存,{ reason } ";
161161 WriteLog ( reason ) ;
162- Task . Run ( ( ) => UpdateCache ( reason , true ) ) ;
162+ Consumer . Run ( ( ) => UpdateCache ( reason , true ) ) ;
163163 }
164164
165165 void UpdateCache ( String reason , Boolean isAsync = false )
Original file line number Diff line number Diff line change 1+ using System . Collections . Concurrent ;
2+
3+ namespace XCode . Cache ;
4+
5+ /// <summary>
6+ /// 惰性串行消费者:仅在有任务时启动后台 Task,任务处理完自动退出,空闲时不占线程。
7+ /// 所有任务严格按入队顺序串行执行。
8+ /// </summary>
9+ public class LazyConsumer
10+ {
11+ private readonly ConcurrentQueue < Action > _queue = new ( ) ;
12+ private volatile Int32 _processing = 0 ; // 0: 空闲, 1: 正在处理
13+
14+ /// <summary>
15+ /// 提交一个任务,由内部串行执行
16+ /// </summary>
17+ public void Run ( Action item )
18+ {
19+ _queue . Enqueue ( item ) ;
20+
21+ // 尝试启动处理任务(仅当当前无活跃任务)
22+ if ( Interlocked . CompareExchange ( ref _processing , 1 , 0 ) == 0 )
23+ {
24+ // 成功抢到启动权
25+ _ = Task . Run ( ProcessQueue ) ;
26+ }
27+ }
28+
29+ private void ProcessQueue ( )
30+ {
31+ try
32+ {
33+ // 一次性处理当前所有任务(drain)
34+ while ( _queue . TryDequeue ( out var action ) )
35+ {
36+ try
37+ {
38+ action ( ) ;
39+ }
40+ catch
41+ {
42+ // 可选:记录日志。此处不抛出,避免中断队列
43+ }
44+ }
45+ }
46+ catch { }
47+ finally
48+ {
49+ // 标记为空闲
50+ _processing = 0 ;
51+ }
52+ }
53+ }
Original file line number Diff line number Diff line change @@ -296,7 +296,7 @@ public void ShowStatics()
296296 {
297297 // 频繁更新下,采用异步更新缓存,以提升吞吐。非频繁访问时(2倍超时),同步更新
298298 if ( sec < Expire )
299- Task . Run ( ( ) => UpdateData ( ci , key ) ) ;
299+ Consumer . Run ( ( ) => UpdateData ( ci , key ) ) ;
300300 else
301301 UpdateData ( ci , key ) ;
302302 }
Original file line number Diff line number Diff line change 88namespace XCode ;
99
1010/// <summary>实体队列。支持凑批更新数据,包括Insert/Update/Delete/Upsert</summary>
11- public class EntityQueue : DisposeBase
11+ public class EntityQueue ( IEntitySession session ) : DisposeBase
1212{
1313 #region 属性
1414 /// <summary>需要近实时保存的实体队列</summary>
@@ -21,7 +21,7 @@ public class EntityQueue : DisposeBase
2121 public Boolean Debug { get ; set ; }
2222
2323 /// <summary>数据会话,分表分库时使用</summary>
24- public IEntitySession Session { get ; }
24+ public IEntitySession Session { get ; } = session ;
2525
2626 /// <summary>是否仅插入。默认false</summary>
2727 public Boolean InsertOnly { get ; set ; }
@@ -48,12 +48,9 @@ public class EntityQueue : DisposeBase
4848
4949 private TimerX ? _Timer ;
5050 private String ? _lastTraceId ;
51- #endregion
5251
52+ #endregion
5353 #region 构造
54- /// <summary>实例化实体队列</summary>
55- public EntityQueue ( IEntitySession session ) => Session = session ;
56-
5754 /// <summary>销毁时,持久化队列</summary>
5855 /// <param name="disposing"></param>
5956 protected override void Dispose ( Boolean disposing )
You can’t perform that action at this time.
0 commit comments