Skip to content

Commit 0343fe1

Browse files
author
Oren (electricessence)
committed
Improved exception handling for concurrent methods.
1 parent 4cce6b8 commit 0343fe1

File tree

7 files changed

+303
-85
lines changed

7 files changed

+303
-85
lines changed

Open.ChannelExtensions.Tests/BasicTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace Open.ChannelExtensions.Tests
1010
{
11-
public static partial class BasicTests
11+
public static class BasicTests
1212
{
1313
const int testSize1 = 10000001;
1414
const int testSize2 = 30000001;
@@ -325,5 +325,6 @@ public static async Task Filter(int testSize)
325325
Assert.True(result.SequenceEqual(range.Where(i => i % 2 == 1)));
326326
}
327327

328+
328329
}
329330
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Xunit;
6+
7+
namespace Open.ChannelExtensions.Tests
8+
{
9+
public static class CancellationTests
10+
{
11+
12+
[Fact]
13+
public static async Task OperationCancellationPropagation()
14+
{
15+
int count = 0;
16+
var range = Enumerable.Range(0, 1000);
17+
using var tokenSource = new CancellationTokenSource();
18+
var token = tokenSource.Token;
19+
try
20+
{
21+
await range
22+
.ToChannel()
23+
.ReadAll(i =>
24+
{
25+
if (i == 500)
26+
{
27+
Interlocked.Increment(ref count);
28+
tokenSource.Cancel();
29+
}
30+
token.ThrowIfCancellationRequested();
31+
});
32+
}
33+
catch (Exception ex)
34+
{
35+
Assert.IsType<OperationCanceledException>(ex);
36+
}
37+
38+
Assert.Equal(1, count);
39+
}
40+
41+
[Fact]
42+
public static async Task TaskCancellationPropagationConcurrent()
43+
{
44+
const int testSize = 1000000;
45+
int total = 0;
46+
int count = 0;
47+
var range = Enumerable.Range(0, testSize);
48+
using var tokenSource = new CancellationTokenSource();
49+
var token = tokenSource.Token;
50+
try
51+
{
52+
await range
53+
.ToChannel()
54+
.ReadAllConcurrently(8, i =>
55+
{
56+
Interlocked.Increment(ref total);
57+
if (i == 500)
58+
{
59+
Interlocked.Increment(ref count);
60+
tokenSource.Cancel();
61+
}
62+
token.ThrowIfCancellationRequested();
63+
});
64+
}
65+
catch (Exception ex)
66+
{
67+
Assert.IsType<TaskCanceledException>(ex);
68+
}
69+
70+
Assert.Equal(1, count);
71+
Assert.NotEqual(testSize, total);
72+
}
73+
74+
75+
[Fact]
76+
public static async Task CancellationPropagationConcurrent()
77+
{
78+
const int testSize = 1000000;
79+
int total = 0;
80+
int count = 0;
81+
var range = Enumerable.Range(0, testSize);
82+
using var tokenSource = new CancellationTokenSource();
83+
var token = tokenSource.Token;
84+
try
85+
{
86+
await range
87+
.ToChannel()
88+
.ReadAllConcurrently(8, token, i =>
89+
{
90+
Interlocked.Increment(ref total);
91+
if (i == 500)
92+
{
93+
Interlocked.Increment(ref count);
94+
tokenSource.Cancel();
95+
}
96+
});
97+
}
98+
catch (Exception ex)
99+
{
100+
Assert.IsType<TaskCanceledException>(ex);
101+
}
102+
103+
Assert.Equal(1, count);
104+
Assert.NotEqual(testSize, total);
105+
106+
}
107+
108+
}
109+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Xunit;
6+
7+
namespace Open.ChannelExtensions.Tests
8+
{
9+
public static class ExceptionTests
10+
{
11+
12+
class TestException : Exception { }
13+
14+
[Fact]
15+
public static async Task ExceptionPropagation()
16+
{
17+
int count = 0;
18+
var range = Enumerable.Range(0, 1000);
19+
try
20+
{
21+
await range
22+
.ToChannel()
23+
.ReadAll(i =>
24+
{
25+
if (i == 500)
26+
{
27+
Interlocked.Increment(ref count);
28+
throw new TestException();
29+
}
30+
});
31+
}
32+
catch (Exception ex)
33+
{
34+
Assert.IsType<TestException>(ex);
35+
}
36+
37+
Assert.Equal(1, count);
38+
}
39+
40+
[Fact]
41+
public static async Task ExceptionPropagationConcurrent()
42+
{
43+
const int testSize = 1000000;
44+
int total = 0;
45+
int count = 0;
46+
var range = Enumerable.Range(0, testSize);
47+
try
48+
{
49+
await range
50+
.ToChannel()
51+
.ReadAllConcurrently(8, i =>
52+
{
53+
Interlocked.Increment(ref total);
54+
if (i == 500)
55+
{
56+
Interlocked.Increment(ref count);
57+
throw new TestException();
58+
}
59+
});
60+
}
61+
catch (Exception ex)
62+
{
63+
Assert.IsType<AggregateException>(ex);
64+
Assert.IsType<TestException>(((AggregateException)ex).InnerException);
65+
}
66+
67+
Assert.Equal(1, count);
68+
Assert.NotEqual(testSize, total);
69+
}
70+
}
71+
}

Open.ChannelExtensions/Documentation.xml

Lines changed: 60 additions & 56 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Open.ChannelExtensions/Extensions.ReadConcurrently.cs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public static partial class Extensions
1818
/// <param name="receiver">The async receiver function.</param>
1919
/// <param name="cancellationToken">An optional cancellation token.</param>
2020
/// <returns>A task that completes when no more reading is to be done.</returns>
21+
[System.Diagnostics.CodeAnalysis.SuppressMessage("Reliability", "CA2000:Dispose objects before losing scope", Justification = "Async scope.")]
2122
public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reader,
2223
int maxConcurrency,
2324
Func<T, ValueTask> receiver,
@@ -33,20 +34,39 @@ public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reade
3334
if (maxConcurrency == 1)
3435
return reader.ReadAllAsync(receiver, cancellationToken, true).AsTask();
3536

37+
var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
3638
var readers = new Task<long>[maxConcurrency];
3739
for (var r = 0; r < maxConcurrency; ++r)
38-
readers[r] = reader
39-
.ReadUntilCancelledAsync(cancellationToken, ParallelReceiver, true).AsTask();
40+
readers[r] = Read();
4041

4142
return Task
4243
.WhenAll(readers)
43-
.ContinueWith(
44-
t => t.Result.Sum(),
44+
.ContinueWith(t =>
45+
{
46+
tokenSource.Dispose();
47+
if (t.IsFaulted) return Task.FromException<long>(t.Exception);
48+
return Task.FromResult(t.Result.Sum());
49+
},
4550
CancellationToken.None,
46-
TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously,
47-
TaskScheduler.Current);
51+
TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously,
52+
TaskScheduler.Current)
53+
.Unwrap();
54+
55+
async Task<long> Read()
56+
{
57+
try
58+
{
59+
return await reader.ReadUntilCancelledAsync(tokenSource.Token, ParallelReceiver, true);
60+
}
61+
catch
62+
{
63+
tokenSource.Cancel();
64+
throw;
65+
}
66+
}
4867

4968
ValueTask ParallelReceiver(T item, long i) => receiver(item);
69+
5070
}
5171

5272
/// <summary>

0 commit comments

Comments
 (0)