Skip to content

Commit 2f4caa3

Browse files
thomhurstclaude
andcommitted
fix: Resolve unobserved task exceptions in AsyncEnumerable processors
Fixed ObjectDisposedException occurring when semaphore was disposed while background tasks were still trying to release it. Changes: - Wrapped task creation and execution in try-finally blocks - Ensured all tasks complete before disposing semaphore using finally blocks - Added exception handling for cancellation scenarios - Removed unnecessary Task.Yield calls in AsyncEnumerable processors This prevents the "Cannot access a disposed object" exceptions that were appearing during test runs when tasks were finalized by the GC. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent f40ea5e commit 2f4caa3

File tree

2 files changed

+34
-26
lines changed

2 files changed

+34
-26
lines changed

EnumerableAsyncProcessor/RunnableProcessors/AsyncEnumerable/AsyncEnumerableParallelProcessor.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ internal AsyncEnumerableParallelProcessor(
2525
public async Task ExecuteAsync()
2626
{
2727
var cancellationToken = _cancellationTokenSource.Token;
28-
var semaphore = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
28+
using var semaphore = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
2929
var tasks = new List<Task>();
3030

3131
try
@@ -39,8 +39,7 @@ public async Task ExecuteAsync()
3939
{
4040
try
4141
{
42-
// Yield to ensure we don't block the thread if _taskSelector is synchronous
43-
await Task.Yield();
42+
// Removed Task.Yield - parallelism is now handled at the processor level
4443
await _taskSelector(capturedItem).ConfigureAwait(false);
4544
}
4645
finally
@@ -51,12 +50,15 @@ public async Task ExecuteAsync()
5150

5251
tasks.Add(task);
5352
}
54-
55-
await Task.WhenAll(tasks).ConfigureAwait(false);
5653
}
5754
finally
5855
{
59-
semaphore.Dispose();
56+
// Always wait for all tasks to complete before the using block disposes the semaphore
57+
// This ensures the semaphore is not disposed while tasks are still running
58+
if (tasks.Count > 0)
59+
{
60+
await Task.WhenAll(tasks).ConfigureAwait(false);
61+
}
6062
}
6163
}
6264
}

EnumerableAsyncProcessor/RunnableProcessors/AsyncEnumerable/ResultProcessors/ResultAsyncEnumerableParallelProcessor.cs

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ internal ResultAsyncEnumerableParallelProcessor(
2626
public async IAsyncEnumerable<TOutput> ExecuteAsync()
2727
{
2828
var cancellationToken = _cancellationTokenSource.Token;
29-
var semaphore = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
29+
using var semaphore = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
3030
var tasks = new List<Task<TOutput>>();
3131

3232
try
@@ -36,7 +36,18 @@ public async IAsyncEnumerable<TOutput> ExecuteAsync()
3636
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
3737

3838
var capturedItem = item;
39-
var task = ProcessItemAsync(capturedItem, semaphore, cancellationToken);
39+
// Use Task.Run to ensure parallelism and prevent blocking
40+
var task = Task.Run(async () =>
41+
{
42+
try
43+
{
44+
return await _taskSelector(capturedItem).ConfigureAwait(false);
45+
}
46+
finally
47+
{
48+
semaphore.Release();
49+
}
50+
}, cancellationToken);
4051
tasks.Add(task);
4152

4253
// Yield completed results
@@ -56,24 +67,19 @@ public async IAsyncEnumerable<TOutput> ExecuteAsync()
5667
}
5768
finally
5869
{
59-
semaphore.Dispose();
60-
}
61-
}
62-
63-
private async Task<TOutput> ProcessItemAsync(
64-
TInput item,
65-
SemaphoreSlim semaphore,
66-
CancellationToken cancellationToken)
67-
{
68-
try
69-
{
70-
// Yield to ensure we don't block the thread if _taskSelector is synchronous
71-
await Task.Yield();
72-
return await _taskSelector(item).ConfigureAwait(false);
73-
}
74-
finally
75-
{
76-
semaphore.Release();
70+
// Ensure all tasks complete before the using block disposes the semaphore
71+
// This handles cancellation or exception scenarios
72+
if (tasks.Count > 0)
73+
{
74+
try
75+
{
76+
await Task.WhenAll(tasks).ConfigureAwait(false);
77+
}
78+
catch
79+
{
80+
// Ignore exceptions here as they've already been handled
81+
}
82+
}
7783
}
7884
}
7985
}

0 commit comments

Comments
 (0)