Skip to content

Commit ab3697a

Browse files
Fixed issue with inconsistent exception handling when batching.
1 parent 7fd2548 commit ab3697a

File tree

3 files changed

+85
-4
lines changed

3 files changed

+85
-4
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using System.Threading.Channels;
5+
using Xunit;
6+
7+
namespace Open.ChannelExtensions.Tests;
8+
public static class PipelineExceptionTests
9+
{
10+
const int BatchSize = 20;
11+
const int Elements = 100;
12+
13+
[Theory]
14+
[InlineData(BatchSize - 1)]
15+
[InlineData(BatchSize)]
16+
[InlineData(BatchSize + 1)]
17+
[InlineData(-1)]
18+
[InlineData(Elements)]
19+
public static async Task Regular(int elementToThrow)
20+
{
21+
var channel = Channel.CreateBounded<int>(10000);
22+
23+
for (int i = 0; i < Elements; i++)
24+
{
25+
await channel.Writer.WriteAsync(i);
26+
}
27+
28+
var task = channel.Reader
29+
.Pipe(1, element => elementToThrow == -1 || element == elementToThrow ? throw new Exception() : element)
30+
.Pipe(2, evt => evt * 2)
31+
//.Batch(20)
32+
.PipeAsync(1, evt => new ValueTask<int>(evt))
33+
.ReadAll(_ => { });
34+
35+
if(elementToThrow == Elements)
36+
channel.Writer.Complete(new Exception());
37+
else
38+
channel.Writer.Complete();
39+
40+
await Assert.ThrowsAsync<AggregateException>(async () => await task);
41+
}
42+
43+
[Theory]
44+
[InlineData(0)]
45+
[InlineData(1)]
46+
[InlineData(BatchSize - 1)]
47+
[InlineData(BatchSize)]
48+
[InlineData(BatchSize + 1)]
49+
[InlineData(-1)]
50+
public static async Task Batched(int elementToThrow)
51+
{
52+
var channel = Channel.CreateBounded<int>(10000);
53+
54+
for (int i = 0; i < Elements; i++)
55+
{
56+
await channel.Writer.WriteAsync(i);
57+
}
58+
59+
var task = channel.Reader
60+
.Pipe(1, element => elementToThrow == -1 || element == elementToThrow ? throw new Exception() : element)
61+
.Pipe(2, evt => evt * 2)
62+
.Batch(20)
63+
.PipeAsync(1, evt => new ValueTask<List<int>>(evt))
64+
.ReadAll(_ => { });
65+
66+
if(elementToThrow == Elements)
67+
channel.Writer.Complete(new Exception());
68+
else
69+
channel.Writer.Complete();
70+
71+
await Assert.ThrowsAsync<AggregateException>(async () => await task);
72+
}
73+
}

Open.ChannelExtensions/BufferingChannelReader.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,21 @@ public override bool TryRead(out TOut item)
9999
/// <inheritdoc />
100100
public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
101101
{
102-
if (Buffer?.Reader.Completion.IsCompleted != false)
103-
return new ValueTask<bool>(false);
102+
var buffer = Buffer;
103+
if(buffer is null) return new ValueTask<bool>(false);
104+
105+
var completion = buffer.Reader.Completion;
106+
if (completion.IsCompleted)
107+
{
108+
return completion.IsFaulted
109+
? new ValueTask<bool>(Task.FromException<bool>(completion.Exception))
110+
: new ValueTask<bool>(false);
111+
}
104112

105113
if (cancellationToken.IsCancellationRequested)
106114
return new ValueTask<bool>(Task.FromCanceled<bool>(cancellationToken));
107115

108-
ValueTask<bool> b = Buffer.Reader.WaitToReadAsync(cancellationToken);
116+
ValueTask<bool> b = buffer.Reader.WaitToReadAsync(cancellationToken);
109117
return b.IsCompleted ? b : WaitToReadAsyncCore(b, cancellationToken);
110118
}
111119

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<RepositoryType>git</RepositoryType>
2323
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
2424
<GenerateDocumentationFile>true</GenerateDocumentationFile>
25-
<Version>6.2.0</Version>
25+
<Version>6.2.1</Version>
2626
<PackageReleaseNotes></PackageReleaseNotes>
2727
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2828
<PublishRepositoryUrl>true</PublishRepositoryUrl>

0 commit comments

Comments
 (0)