Skip to content

Commit 32ea3e9

Browse files
authored
Merge pull request #312 from thomhurst/fix/maximize-parallel-throughput
fix: Maximize parallel throughput
2 parents 7254f25 + 2f4caa3 commit 32ea3e9

File tree

7 files changed

+75
-97
lines changed

7 files changed

+75
-97
lines changed

EnumerableAsyncProcessor/RunnableProcessors/AsyncEnumerable/AsyncEnumerableParallelProcessor.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ internal AsyncEnumerableParallelProcessor(
2525
public async Task ExecuteAsync()
2626
{
2727
var cancellationToken = _cancellationTokenSource.Token;
28-
var semaphore = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
28+
using var semaphore = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
2929
var tasks = new List<Task>();
3030

3131
try
@@ -39,8 +39,7 @@ public async Task ExecuteAsync()
3939
{
4040
try
4141
{
42-
// Yield to ensure we don't block the thread if _taskSelector is synchronous
43-
await Task.Yield();
42+
// Removed Task.Yield - parallelism is now handled at the processor level
4443
await _taskSelector(capturedItem).ConfigureAwait(false);
4544
}
4645
finally
@@ -51,12 +50,15 @@ public async Task ExecuteAsync()
5150

5251
tasks.Add(task);
5352
}
54-
55-
await Task.WhenAll(tasks).ConfigureAwait(false);
5653
}
5754
finally
5855
{
59-
semaphore.Dispose();
56+
// Always wait for all tasks to complete before the using block disposes the semaphore
57+
// This ensures the semaphore is not disposed while tasks are still running
58+
if (tasks.Count > 0)
59+
{
60+
await Task.WhenAll(tasks).ConfigureAwait(false);
61+
}
6062
}
6163
}
6264
}

EnumerableAsyncProcessor/RunnableProcessors/AsyncEnumerable/ResultProcessors/ResultAsyncEnumerableParallelProcessor.cs

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ internal ResultAsyncEnumerableParallelProcessor(
2626
public async IAsyncEnumerable<TOutput> ExecuteAsync()
2727
{
2828
var cancellationToken = _cancellationTokenSource.Token;
29-
var semaphore = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
29+
using var semaphore = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
3030
var tasks = new List<Task<TOutput>>();
3131

3232
try
@@ -36,7 +36,18 @@ public async IAsyncEnumerable<TOutput> ExecuteAsync()
3636
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
3737

3838
var capturedItem = item;
39-
var task = ProcessItemAsync(capturedItem, semaphore, cancellationToken);
39+
// Use Task.Run to ensure parallelism and prevent blocking
40+
var task = Task.Run(async () =>
41+
{
42+
try
43+
{
44+
return await _taskSelector(capturedItem).ConfigureAwait(false);
45+
}
46+
finally
47+
{
48+
semaphore.Release();
49+
}
50+
}, cancellationToken);
4051
tasks.Add(task);
4152

4253
// Yield completed results
@@ -56,24 +67,19 @@ public async IAsyncEnumerable<TOutput> ExecuteAsync()
5667
}
5768
finally
5869
{
59-
semaphore.Dispose();
60-
}
61-
}
62-
63-
private async Task<TOutput> ProcessItemAsync(
64-
TInput item,
65-
SemaphoreSlim semaphore,
66-
CancellationToken cancellationToken)
67-
{
68-
try
69-
{
70-
// Yield to ensure we don't block the thread if _taskSelector is synchronous
71-
await Task.Yield();
72-
return await _taskSelector(item).ConfigureAwait(false);
73-
}
74-
finally
75-
{
76-
semaphore.Release();
70+
// Ensure all tasks complete before the using block disposes the semaphore
71+
// This handles cancellation or exception scenarios
72+
if (tasks.Count > 0)
73+
{
74+
try
75+
{
76+
await Task.WhenAll(tasks).ConfigureAwait(false);
77+
}
78+
catch
79+
{
80+
// Ignore exceptions here as they've already been handled
81+
}
82+
}
7783
}
7884
}
7985
}

EnumerableAsyncProcessor/RunnableProcessors/ParallelAsyncProcessor.cs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,33 +16,31 @@ internal override async Task Process()
1616
// If no concurrency limit, process all tasks in parallel
1717
if (_maxConcurrency == null)
1818
{
19-
await Task.WhenAll(TaskWrappers.Select(taskWrapper => taskWrapper.Process(CancellationToken))).ConfigureAwait(false);
19+
// Use Task.Run to ensure all tasks start immediately on thread pool threads
20+
// This prevents synchronous code in user delegates from blocking other tasks
21+
await Task.WhenAll(TaskWrappers.Select(taskWrapper =>
22+
Task.Run(() => taskWrapper.Process(CancellationToken), CancellationToken)
23+
)).ConfigureAwait(false);
2024
return;
2125
}
2226

2327
// Use semaphore for concurrency throttling
2428
using var semaphore = new SemaphoreSlim(_maxConcurrency.Value, _maxConcurrency.Value);
2529

26-
var tasks = TaskWrappers.Select(async taskWrapper =>
30+
// Materialize tasks immediately to ensure they all start in parallel (up to concurrency limit)
31+
// Use Task.Run to prevent synchronous code from blocking thread pool threads
32+
var tasks = TaskWrappers.Select(taskWrapper => Task.Run(async () =>
2733
{
2834
await semaphore.WaitAsync(CancellationToken).ConfigureAwait(false);
2935
try
3036
{
31-
var task = taskWrapper.Process(CancellationToken);
32-
33-
// Fast-path for already completed tasks
34-
if (task.IsCompleted)
35-
{
36-
return;
37-
}
38-
39-
await task.ConfigureAwait(false);
37+
await taskWrapper.Process(CancellationToken).ConfigureAwait(false);
4038
}
4139
finally
4240
{
4341
semaphore.Release();
4442
}
45-
});
43+
}, CancellationToken)).ToList(); // Force immediate task creation
4644

4745
await Task.WhenAll(tasks).ConfigureAwait(false);
4846
}

EnumerableAsyncProcessor/RunnableProcessors/ParallelAsyncProcessor_1.cs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,31 @@ internal override async Task Process()
1616
// If no concurrency limit, process all tasks in parallel
1717
if (_maxConcurrency == null)
1818
{
19+
// Use Task.Run to ensure all tasks start immediately on thread pool threads
20+
// This prevents synchronous code in user delegates from blocking other tasks
1921
await Task.WhenAll(TaskWrappers.Select(taskWrapper =>
20-
{
21-
var task = taskWrapper.Process(CancellationToken);
22-
// Fast-path for already completed tasks
23-
if (task.IsCompleted)
24-
{
25-
}
26-
return task;
27-
})).ConfigureAwait(false);
22+
Task.Run(() => taskWrapper.Process(CancellationToken), CancellationToken)
23+
)).ConfigureAwait(false);
2824
return;
2925
}
3026

3127
// Use semaphore for concurrency throttling
3228
using var semaphore = new SemaphoreSlim(_maxConcurrency.Value, _maxConcurrency.Value);
3329

34-
var tasks = TaskWrappers.Select(async taskWrapper =>
30+
// Materialize tasks immediately to ensure they all start in parallel (up to concurrency limit)
31+
// Use Task.Run to prevent synchronous code from blocking thread pool threads
32+
var tasks = TaskWrappers.Select(taskWrapper => Task.Run(async () =>
3533
{
3634
await semaphore.WaitAsync(CancellationToken).ConfigureAwait(false);
3735
try
3836
{
39-
var task = taskWrapper.Process(CancellationToken);
40-
// Fast-path for already completed tasks
41-
if (task.IsCompleted)
42-
{
43-
return;
44-
}
45-
await task.ConfigureAwait(false);
37+
await taskWrapper.Process(CancellationToken).ConfigureAwait(false);
4638
}
4739
finally
4840
{
4941
semaphore.Release();
5042
}
51-
});
43+
}, CancellationToken)).ToList(); // Force immediate task creation
5244

5345
await Task.WhenAll(tasks).ConfigureAwait(false);
5446
}

EnumerableAsyncProcessor/RunnableProcessors/ResultProcessors/ResultParallelAsyncProcessor_1.cs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,31 @@ internal override async Task Process()
1616
// If no concurrency limit, process all tasks in parallel
1717
if (_maxConcurrency == null)
1818
{
19+
// Use Task.Run to ensure all tasks start immediately on thread pool threads
20+
// This prevents synchronous code in user delegates from blocking other tasks
1921
await Task.WhenAll(TaskWrappers.Select(taskWrapper =>
20-
{
21-
var task = taskWrapper.Process(CancellationToken);
22-
// Fast-path for already completed tasks
23-
if (task.IsCompleted)
24-
{
25-
}
26-
return task;
27-
})).ConfigureAwait(false);
22+
Task.Run(() => taskWrapper.Process(CancellationToken), CancellationToken)
23+
)).ConfigureAwait(false);
2824
return;
2925
}
3026

3127
// Use semaphore for concurrency throttling
3228
using var semaphore = new SemaphoreSlim(_maxConcurrency.Value, _maxConcurrency.Value);
3329

34-
var tasks = TaskWrappers.Select(async taskWrapper =>
30+
// Materialize tasks immediately to ensure they all start in parallel (up to concurrency limit)
31+
// Use Task.Run to prevent synchronous code from blocking thread pool threads
32+
var tasks = TaskWrappers.Select(taskWrapper => Task.Run(async () =>
3533
{
3634
await semaphore.WaitAsync(CancellationToken).ConfigureAwait(false);
3735
try
3836
{
39-
var task = taskWrapper.Process(CancellationToken);
40-
// Fast-path for already completed tasks
41-
if (task.IsCompleted)
42-
{
43-
return;
44-
}
45-
await task.ConfigureAwait(false);
37+
await taskWrapper.Process(CancellationToken).ConfigureAwait(false);
4638
}
4739
finally
4840
{
4941
semaphore.Release();
5042
}
51-
});
43+
}, CancellationToken)).ToList(); // Force immediate task creation
5244

5345
await Task.WhenAll(tasks).ConfigureAwait(false);
5446
}

EnumerableAsyncProcessor/RunnableProcessors/ResultProcessors/ResultParallelAsyncProcessor_2.cs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,31 @@ internal override async Task Process()
1616
// If no concurrency limit, process all tasks in parallel
1717
if (_maxConcurrency == null)
1818
{
19+
// Use Task.Run to ensure all tasks start immediately on thread pool threads
20+
// This prevents synchronous code in user delegates from blocking other tasks
1921
await Task.WhenAll(TaskWrappers.Select(taskWrapper =>
20-
{
21-
var task = taskWrapper.Process(CancellationToken);
22-
// Fast-path for already completed tasks
23-
if (task.IsCompleted)
24-
{
25-
}
26-
return task;
27-
})).ConfigureAwait(false);
22+
Task.Run(() => taskWrapper.Process(CancellationToken), CancellationToken)
23+
)).ConfigureAwait(false);
2824
return;
2925
}
3026

3127
// Use semaphore for concurrency throttling
3228
using var semaphore = new SemaphoreSlim(_maxConcurrency.Value, _maxConcurrency.Value);
3329

34-
var tasks = TaskWrappers.Select(async taskWrapper =>
30+
// Materialize tasks immediately to ensure they all start in parallel (up to concurrency limit)
31+
// Use Task.Run to prevent synchronous code from blocking thread pool threads
32+
var tasks = TaskWrappers.Select(taskWrapper => Task.Run(async () =>
3533
{
3634
await semaphore.WaitAsync(CancellationToken).ConfigureAwait(false);
3735
try
3836
{
39-
var task = taskWrapper.Process(CancellationToken);
40-
// Fast-path for already completed tasks
41-
if (task.IsCompleted)
42-
{
43-
return;
44-
}
45-
await task.ConfigureAwait(false);
37+
await taskWrapper.Process(CancellationToken).ConfigureAwait(false);
4638
}
4739
finally
4840
{
4941
semaphore.Release();
5042
}
51-
});
43+
}, CancellationToken)).ToList(); // Force immediate task creation
5244

5345
await Task.WhenAll(tasks).ConfigureAwait(false);
5446
}

EnumerableAsyncProcessor/TaskWrapper.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ public async Task Process(CancellationToken cancellationToken)
2727

2828
try
2929
{
30-
// Yield to ensure we don't block the calling thread if TaskFactory is synchronous
31-
await Task.Yield();
30+
// Removed Task.Yield - parallelism is now handled at the processor level
3231
var task = TaskFactory.Invoke();
3332

3433
// Fast-path for already completed tasks
@@ -123,8 +122,7 @@ public async Task Process(CancellationToken cancellationToken)
123122

124123
try
125124
{
126-
// Yield to ensure we don't block the calling thread if TaskFactory is synchronous
127-
await Task.Yield();
125+
// Removed Task.Yield - parallelism is now handled at the processor level
128126
var task = TaskFactory.Invoke(Input);
129127

130128
// Fast-path for already completed tasks
@@ -222,8 +220,7 @@ public async Task Process(CancellationToken cancellationToken)
222220

223221
try
224222
{
225-
// Yield to ensure we don't block the calling thread if TaskFactory is synchronous
226-
await Task.Yield();
223+
// Removed Task.Yield - parallelism is now handled at the processor level
227224
var task = TaskFactory.Invoke(Input);
228225

229226
// Fast-path for already completed tasks
@@ -318,8 +315,7 @@ public async Task Process(CancellationToken cancellationToken)
318315

319316
try
320317
{
321-
// Yield to ensure we don't block the calling thread if TaskFactory is synchronous
322-
await Task.Yield();
318+
// Removed Task.Yield - parallelism is now handled at the processor level
323319
var task = TaskFactory.Invoke();
324320

325321
// Fast-path for already completed tasks

0 commit comments

Comments
 (0)