Skip to content

Commit 018def4

Browse files
Enhanced async cancellation handling
- Updated the expected exception type in `CancellationTests` from `OperationCanceledException` to `TaskCanceledException`, reflecting a more accurate expectation for task cancellation scenarios. - Added a new extension method `WaitToReadOrCancelAsync` in `Extensions.Read.cs` to improve support for asynchronous read operations with cancellation. - Refactored existing reading logic to utilize the new `WaitToReadOrCancelAsync` method, streamlining cancellation handling. - Incremented the version of `Open.ChannelExtensions` to 8.4.0, indicating the introduction of new features and improvements.
1 parent 85230ca commit 018def4

File tree

3 files changed

+45
-40
lines changed

3 files changed

+45
-40
lines changed

Open.ChannelExtensions.Tests/CancellationTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ await range
5757
}
5858
catch (Exception ex)
5959
{
60-
Assert.IsType<OperationCanceledException>(ex);
60+
Assert.IsType<TaskCanceledException>(ex);
6161
}
6262

6363
Assert.Equal(1, count);

Open.ChannelExtensions/Extensions.Read.cs

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,24 @@ public static partial class Extensions
66
{
77
private const string MustBeAtLeast1 = "Must be at least 1.";
88

9+
/// <summary>
10+
/// Waits for read to complete or the cancellation token to be cancelled.
11+
/// </summary>
12+
public static async ValueTask<bool> WaitToReadOrCancelAsync<T>(this ChannelReader<T> reader, CancellationToken cancellationToken)
13+
{
14+
if (reader is null) throw new ArgumentNullException(nameof(reader));
15+
Contract.EndContractBlock();
16+
17+
try
18+
{
19+
return await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
20+
}
21+
catch (OperationCanceledException)
22+
{
23+
return false;
24+
}
25+
}
26+
927
/// <summary>
1028
/// Creates an enumerable that will read from the channel until no more are available for read.
1129
/// </summary>
@@ -117,38 +135,32 @@ public static async ValueTask<long> ReadUntilCancelledAsync<T>(this ChannelReade
117135
await Task.Yield();
118136

119137
long index = 0;
120-
try
138+
139+
if (cancellationToken.CanBeCanceled)
121140
{
122-
if (cancellationToken.CanBeCanceled)
141+
do
123142
{
124-
do
125-
{
126-
while (
127-
!cancellationToken.IsCancellationRequested
128-
&& reader.TryRead(out T? item))
129-
{
130-
await receiver(item, index++).ConfigureAwait(false);
131-
}
132-
}
133143
while (
134144
!cancellationToken.IsCancellationRequested
135-
&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
136-
}
137-
else
138-
{
139-
do
145+
&& reader.TryRead(out T? item))
140146
{
141-
while (reader.TryRead(out T? item))
142-
{
143-
await receiver(item, index++).ConfigureAwait(false);
144-
}
147+
await receiver(item, index++).ConfigureAwait(false);
145148
}
146-
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
147149
}
150+
while (
151+
!cancellationToken.IsCancellationRequested
152+
&& await reader.WaitToReadOrCancelAsync(cancellationToken).ConfigureAwait(false));
148153
}
149-
catch (OperationCanceledException)
154+
else
150155
{
151-
// In case WaitToReadAsync is cancelled.
156+
do
157+
{
158+
while (reader.TryRead(out T? item))
159+
{
160+
await receiver(item, index++).ConfigureAwait(false);
161+
}
162+
}
163+
while (await reader.WaitToReadOrCancelAsync(cancellationToken).ConfigureAwait(false));
152164
}
153165

154166
return index;
@@ -373,27 +385,20 @@ public static async ValueTask ReadAllAsEnumerablesAsync<T>(this ChannelReader<T>
373385
if (deferredExecution)
374386
await Task.Yield();
375387

376-
try
388+
if (cancellationToken.CanBeCanceled)
377389
{
378-
if (cancellationToken.CanBeCanceled)
379-
{
380-
while (
381-
!cancellationToken.IsCancellationRequested
382-
&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
383-
{
384-
await receiver(reader.ReadAvailable(cancellationToken)).ConfigureAwait(false);
385-
}
386-
return;
387-
}
388-
389-
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
390+
while (
391+
!cancellationToken.IsCancellationRequested
392+
&& await reader.WaitToReadOrCancelAsync(cancellationToken).ConfigureAwait(false))
390393
{
391394
await receiver(reader.ReadAvailable(cancellationToken)).ConfigureAwait(false);
392395
}
396+
return;
393397
}
394-
catch (OperationCanceledException)
398+
399+
while (await reader.WaitToReadOrCancelAsync(cancellationToken).ConfigureAwait(false))
395400
{
396-
// In case WaitToReadAsync is cancelled.
401+
await receiver(reader.ReadAvailable(cancellationToken)).ConfigureAwait(false);
397402
}
398403
}
399404

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<RepositoryType>git</RepositoryType>
2323
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
2424
<GenerateDocumentationFile>true</GenerateDocumentationFile>
25-
<Version>8.3.0</Version>
25+
<Version>8.4.0</Version>
2626
<PackageReleaseNotes>Added .Merge, .PipeAsync, and .PropagateCompletion extensions.</PackageReleaseNotes>
2727
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2828
<PublishRepositoryUrl>true</PublishRepositoryUrl>

0 commit comments

Comments
 (0)