Skip to content

Commit 75eacc0

Browse files
author
Oren (electricessence)
committed
Fixed potential race condition issues with cancellation and exception handling.
1 parent bdc82d4 commit 75eacc0

File tree

3 files changed

+46
-30
lines changed

3 files changed

+46
-30
lines changed

Open.ChannelExtensions.Tests/ExceptionTests.cs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,29 +40,35 @@ await range
4040
[Fact]
4141
public static async Task ExceptionPropagationConcurrent()
4242
{
43-
const int testSize = 1000000;
43+
const int testSize = 100000000;
4444
int total = 0;
4545
int count = 0;
4646
var range = Enumerable.Range(0, testSize);
47-
try
47+
await Assert.ThrowsAsync<AggregateException>(async () =>
4848
{
49-
await range
50-
.ToChannel()
51-
.ReadAllConcurrently(8, i =>
52-
{
53-
Interlocked.Increment(ref total);
54-
if (i == 500)
49+
try
50+
{
51+
52+
await range
53+
.ToChannel()
54+
.ReadAllConcurrently(8, i =>
5555
{
56-
Interlocked.Increment(ref count);
57-
throw new TestException();
58-
}
59-
});
60-
}
61-
catch (Exception ex)
62-
{
63-
Assert.IsType<AggregateException>(ex);
64-
Assert.IsType<TestException>(((AggregateException)ex).InnerException);
65-
}
56+
Interlocked.Increment(ref total);
57+
if (i == 500)
58+
{
59+
Interlocked.Increment(ref count);
60+
throw new TestException();
61+
}
62+
});
63+
64+
}
65+
catch (Exception ex)
66+
{
67+
Assert.IsType<AggregateException>(ex);
68+
Assert.IsType<TestException>(((AggregateException)ex).InnerException);
69+
throw;
70+
}
71+
});
6672

6773
Assert.Equal(1, count);
6874
Assert.NotEqual(testSize, total);

Open.ChannelExtensions/Extensions.Read.cs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,22 +87,30 @@ public static async ValueTask<long> ReadUntilCancelledAsync<T>(this ChannelReade
8787
if (deferredExecution)
8888
await Task.Yield();
8989

90+
9091
long index = 0;
91-
do
92+
try
9293
{
93-
var next = new ValueTask();
94-
while (
95-
!cancellationToken.IsCancellationRequested
96-
&& reader.TryRead(out var item))
94+
do
9795
{
96+
var next = new ValueTask();
97+
while (
98+
!cancellationToken.IsCancellationRequested
99+
&& reader.TryRead(out var item))
100+
{
101+
await next.ConfigureAwait(false);
102+
next = receiver(item, index++);
103+
}
98104
await next.ConfigureAwait(false);
99-
next = receiver(item, index++);
100105
}
101-
await next.ConfigureAwait(false);
106+
while (
107+
!cancellationToken.IsCancellationRequested
108+
&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
109+
}
110+
catch (TaskCanceledException)
111+
{
112+
// In case WaitToReadAsync is cancelled.
102113
}
103-
while (
104-
!cancellationToken.IsCancellationRequested
105-
&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
106114

107115
return index;
108116
}

Open.ChannelExtensions/Extensions.ReadConcurrently.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reade
3535
return reader.ReadAllAsync(receiver, cancellationToken, true).AsTask();
3636

3737
var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
38+
var token = tokenSource.Token;
3839
var readers = new Task<long>[maxConcurrency];
3940
for (var r = 0; r < maxConcurrency; ++r)
4041
readers[r] = Read();
@@ -45,18 +46,19 @@ public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reade
4546
{
4647
tokenSource.Dispose();
4748
if (t.IsFaulted) return Task.FromException<long>(t.Exception);
49+
if (t.IsCanceled) return Task.FromCanceled<long>(token);
4850
return Task.FromResult(t.Result.Sum());
4951
},
5052
CancellationToken.None,
51-
TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously,
53+
TaskContinuationOptions.ExecuteSynchronously,
5254
TaskScheduler.Current)
5355
.Unwrap();
5456

5557
async Task<long> Read()
5658
{
5759
try
5860
{
59-
return await reader.ReadUntilCancelledAsync(tokenSource.Token, ParallelReceiver, true);
61+
return await reader.ReadUntilCancelledAsync(token, ParallelReceiver, true);
6062
}
6163
catch
6264
{

0 commit comments

Comments
 (0)