11// ReSharper disable once CheckNamespace
2+
3+ using System . Collections . Concurrent ;
4+
25namespace Bing . Collections ;
36
47/// <summary>
@@ -49,47 +52,111 @@ public static void ForEach<T>(this IEnumerable<T> enumerable, Action<T, int> act
4952 /// <param name="enumerable">值</param>
5053 /// <param name="action">操作</param>
5154 /// <param name="maxDegreeOfParallelism">最大并行度,默认为无限制</param>
55+ /// <param name="errorHandler">错误处理委托,返回 true 继续执行,返回 false 终止后续操作</param>
56+ /// <param name="cancellationToken">取消令牌</param>
5257 /// <exception cref="ArgumentNullException">源集合对象为空、操作表达式为空</exception>
53- public static Task ForEachAsync < T > ( this IEnumerable < T > enumerable , Func < T , Task > action , int ? maxDegreeOfParallelism = null )
58+ public static Task ForEachAsync < T > ( this IEnumerable < T > enumerable , Func < T , Task > action , int ? maxDegreeOfParallelism = null , Func < T , Exception , bool > errorHandler = null , CancellationToken cancellationToken = default )
5459 {
5560 if ( enumerable == null )
5661 throw new ArgumentNullException ( nameof ( enumerable ) , $@ "源{ typeof ( T ) . Name } 集合对象不可为空!") ;
5762 if ( action == null )
5863 throw new ArgumentNullException ( nameof ( action ) , @"操作表达式不可为空!" ) ;
5964 // 如果未指定并行度,直接并行执行所有任务
6065 if ( ! maxDegreeOfParallelism . HasValue )
61- return Task . WhenAll ( enumerable . Select ( action ) ) ;
66+ {
67+ return errorHandler == null
68+ ? Task . WhenAll ( enumerable . Select ( action ) )
69+ : HandleErrorsWithoutParallelismLimit ( enumerable , action , errorHandler , cancellationToken ) ;
70+ }
71+
6272 // 有并行度限制时,使用 SemaphoreSlim 控制并行度
63- return ParallelForEachAsync ( enumerable , action , maxDegreeOfParallelism . Value ) ;
73+ return ParallelForEachAsync ( enumerable , action , maxDegreeOfParallelism . Value , errorHandler , cancellationToken ) ;
6474 }
6575
6676 /// <summary>
67- /// 对指定集合中的每个元素执行指定操作
77+ /// 不限制并行度的情况下处理错误
78+ /// </summary>
79+ /// <typeparam name="T">类型</typeparam>
80+ /// <param name="source">源</param>
81+ /// <param name="action">操作</param>
82+ /// <param name="errorHandler">错误处理委托</param>
83+ /// <param name="cancellationToken">取消令牌</param>
84+ private static async Task HandleErrorsWithoutParallelismLimit < T > ( IEnumerable < T > source , Func < T , Task > action , Func < T , Exception , bool > errorHandler , CancellationToken cancellationToken )
85+ {
86+ var tasks = new List < Task > ( ) ;
87+ var items = source . ToList ( ) ; // 物化集合以便错误处理时能获取对应的项
88+
89+ for ( var i = 0 ; i < items . Count ; i ++ )
90+ {
91+ var item = items [ i ] ;
92+ var index = i ; // 捕获循环变量
93+
94+ tasks . Add ( Task . Run ( async ( ) =>
95+ {
96+ try
97+ {
98+ await action ( item ) ;
99+ }
100+ catch ( Exception ex ) when ( errorHandler != null )
101+ {
102+ if ( ! errorHandler ( item , ex ) )
103+ {
104+ throw ; // 如果错误处理器返回 false,重新抛出异常
105+ }
106+ }
107+ } , cancellationToken ) ) ;
108+ }
109+
110+ await Task . WhenAll ( tasks ) ;
111+ }
112+
113+ /// <summary>
114+ /// 对指定集合中的每个元素执行指定操作,并提供错误处理
68115 /// </summary>
69116 /// <typeparam name="T">类型</typeparam>
70117 /// <param name="source">源</param>
71118 /// <param name="action">操作</param>
72119 /// <param name="maxDegreeOfParallelism">最大并行度</param>
73- /// <returns></returns>
74- private static async Task ParallelForEachAsync < T > ( IEnumerable < T > source , Func < T , Task > action , int maxDegreeOfParallelism )
120+ /// <param name="errorHandler">错误处理委托</param>
121+ /// <param name="cancellationToken">取消令牌</param>
122+ private static async Task ParallelForEachAsync < T > ( IEnumerable < T > source , Func < T , Task > action , int maxDegreeOfParallelism , Func < T , Exception , bool > errorHandler , CancellationToken cancellationToken )
75123 {
76124 var semaphore = new SemaphoreSlim ( maxDegreeOfParallelism , maxDegreeOfParallelism ) ;
77125 var tasks = new List < Task > ( ) ;
78- foreach ( var item in source )
126+ var exceptions = new ConcurrentBag < ( T Item , Exception Exception ) > ( ) ;
127+ var itemsList = source . ToList ( ) ;
128+ foreach ( var item in itemsList )
79129 {
80- await semaphore . WaitAsync ( ) ;
130+ if ( cancellationToken . IsCancellationRequested )
131+ break ;
132+ await semaphore . WaitAsync ( cancellationToken ) ;
81133 tasks . Add ( Task . Run ( async ( ) =>
82134 {
83135 try
84136 {
85137 await action ( item ) ;
86138 }
139+ catch ( Exception ex ) when ( ! cancellationToken . IsCancellationRequested )
140+ {
141+ if ( errorHandler != null )
142+ {
143+ if ( ! errorHandler ( item , ex ) )
144+ exceptions . Add ( ( item , ex ) ) ;
145+ }
146+ else
147+ {
148+ exceptions . Add ( ( item , ex ) ) ;
149+ }
150+ }
87151 finally
88152 {
89153 semaphore . Release ( ) ;
90154 }
91- } ) ) ;
155+ } , cancellationToken ) ) ;
92156 }
93157 await Task . WhenAll ( tasks ) ;
158+
159+ if ( exceptions . Count > 0 )
160+ throw new AggregateException ( "在处理集合元素时发生一个或多个错误。" , exceptions . Select ( e => new InvalidOperationException ( $ "处理元素 { e . Item } 时出错: { e . Exception . Message } ", e . Exception ) ) ) ;
94161 }
95162}
0 commit comments