Skip to content

Commit 8b174d7

Browse files
committed
🎨 refactor(Enumerable): 优化 ForEach 扩展方法并添加并行处理支持
- 重写 ForEach 扩展方法,使用 foreach 循环替代数组操作 - 添加 ForEachAsync 扩展方法的重载版本,支持指定最大并行度 - 新增 ParallelForEachAsync 私有方法,用于实现并行处理逻辑 - 优化异常处理和参数验证
1 parent c39afbb commit 8b174d7

File tree

1 file changed

+41
-5
lines changed

1 file changed

+41
-5
lines changed

src/Bing.Utils/Bing/Collections/Extensions/Enumerable/Extensions.Enumerable.ForEach.cs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ public static void ForEach<T>(this IEnumerable<T> enumerable, Action<T, int> act
3636
throw new ArgumentNullException(nameof(enumerable), $@"源{typeof(T).Name}集合对象不可为空!");
3737
if (action == null)
3838
throw new ArgumentNullException(nameof(action), @"操作表达式不可为空!");
39-
var array = enumerable.ToArray();
40-
for (var i = 0; i < array.Length; i++)
41-
action(array[i], i);
39+
40+
var index = 0;
41+
foreach (var item in enumerable)
42+
action(item, index++);
4243
}
4344

4445
/// <summary>
@@ -47,13 +48,48 @@ public static void ForEach<T>(this IEnumerable<T> enumerable, Action<T, int> act
4748
/// <typeparam name="T">类型</typeparam>
4849
/// <param name="enumerable">值</param>
4950
/// <param name="action">操作</param>
51+
/// <param name="maxDegreeOfParallelism">最大并行度,默认为无限制</param>
5052
/// <exception cref="ArgumentNullException">源集合对象为空、操作表达式为空</exception>
51-
public static Task ForEachAsync<T>(this IEnumerable<T> enumerable, Func<T, Task> action)
53+
public static Task ForEachAsync<T>(this IEnumerable<T> enumerable, Func<T, Task> action, int? maxDegreeOfParallelism = null)
5254
{
5355
if (enumerable == null)
5456
throw new ArgumentNullException(nameof(enumerable), $@"源{typeof(T).Name}集合对象不可为空!");
5557
if (action == null)
5658
throw new ArgumentNullException(nameof(action), @"操作表达式不可为空!");
57-
return Task.WhenAll(from item in enumerable select Task.Run(() => action(item)));
59+
// 如果未指定并行度,直接并行执行所有任务
60+
if (!maxDegreeOfParallelism.HasValue)
61+
return Task.WhenAll(enumerable.Select(action));
62+
// 有并行度限制时,使用 SemaphoreSlim 控制并行度
63+
return ParallelForEachAsync(enumerable, action, maxDegreeOfParallelism.Value);
64+
}
65+
66+
/// <summary>
67+
/// 对指定集合中的每个元素执行指定操作
68+
/// </summary>
69+
/// <typeparam name="T">类型</typeparam>
70+
/// <param name="source">源</param>
71+
/// <param name="action">操作</param>
72+
/// <param name="maxDegreeOfParallelism">最大并行度</param>
73+
/// <returns></returns>
74+
private static async Task ParallelForEachAsync<T>(IEnumerable<T> source, Func<T, Task> action, int maxDegreeOfParallelism)
75+
{
76+
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
77+
var tasks = new List<Task>();
78+
foreach (var item in source)
79+
{
80+
await semaphore.WaitAsync();
81+
tasks.Add(Task.Run(async () =>
82+
{
83+
try
84+
{
85+
await action(item);
86+
}
87+
finally
88+
{
89+
semaphore.Release();
90+
}
91+
}));
92+
}
93+
await Task.WhenAll(tasks);
5894
}
5995
}

0 commit comments

Comments
 (0)