Skip to content

Commit 64dbf4a

Browse files
Refactor and update project dependencies
Refactored several classes for improved readability and maintainability: - `BasicTests` class: Fixed formatting issues in `BasicTests.cs`. - `PipelineExceptionTests` class: Changed from static to instance-based in `PipelineExceptionTests.cs`. - `Extensions.Join.cs`: Removed conditional compilation directives. - `Extensions.Merge.cs` and `Extensions.Read.cs`: Minor formatting improvements. - `Extensions.Source.cs` and `Extensions.Write.cs`: Added XML documentation comments. - `Extensions._.cs`: Reformatted continuation task in `PropagateCompletion` method. - `BufferingChannelReader.cs`: Ensured `_completion` field is set correctly and handled null buffer cases. - `MergingChannelReader.cs`: Minor formatting change. Updated project files: - `Open.ChannelExtensions.Tests.csproj`: Used wildcard versions for package references. - `Open.ChannelExtensions.csproj`: Incremented version to `8.5.0` and updated `Microsoft.Bcl.AsyncInterfaces` to `8.0.0`.
1 parent 249784e commit 64dbf4a

12 files changed

+83
-60
lines changed

Open.ChannelExtensions.Tests/BasicTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,8 @@ public static async Task PipeFilterAsyncTest()
496496
var source = Enumerable.Range(0, total).ToChannel(300);
497497

498498
var evenFilter = source
499-
.PipeFilterAsync(out var unmatched, 10, 100, static async e => {
499+
.PipeFilterAsync(out var unmatched, 10, 100, static async e =>
500+
{
500501
await Task.Yield();
501502
return e % 2 == 0;
502503
})
@@ -545,5 +546,4 @@ await Assert.ThrowsAsync<OperationCanceledException>(
545546
_ = await reader.ReadAll(_ => { });
546547
});
547548
}
548-
549549
}

Open.ChannelExtensions.Tests/Open.ChannelExtensions.Tests.csproj

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@
1212
</ItemGroup>
1313

1414
<ItemGroup>
15-
<PackageReference Include="FluentAssertions" Version="6.12.0" />
16-
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
17-
<PackageReference Include="xunit" Version="2.6.6" />
18-
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.6">
15+
<PackageReference Include="FluentAssertions" Version="6.*" />
16+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.*" />
17+
<PackageReference Include="xunit" Version="2.*" />
18+
<PackageReference Include="xunit.runner.visualstudio" Version="2.*">
1919
<PrivateAssets>all</PrivateAssets>
2020
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
2121
</PackageReference>
22-
<PackageReference Include="coverlet.collector" Version="6.0.0">
22+
<PackageReference Include="coverlet.collector" Version="6.*">
2323
<PrivateAssets>all</PrivateAssets>
2424
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
2525
</PackageReference>
Lines changed: 65 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,64 @@
1-
namespace Open.ChannelExtensions.Tests;
2-
public static class PipelineExceptionTests
1+
using System.Threading.Tasks;
2+
using System.Xml.Linq;
3+
4+
namespace Open.ChannelExtensions.Tests;
5+
public class PipelineExceptionTests
36
{
47
const int BatchSize = 20;
58
const int Elements = 100;
9+
readonly Channel<int> _channel;
10+
int _thrown = -2;
11+
12+
public PipelineExceptionTests()
13+
{
14+
_channel = Channel.CreateBounded<int>(10000);
15+
for (var i = 0; i < 100; i++)
16+
{
17+
if (!_channel.Writer.TryWrite(i))
18+
throw new Exception("Failed to write " + i);
19+
}
20+
}
21+
22+
Func<int, int> CreateThrowIfEqual(int elementToThrow) => element =>
23+
{
24+
if (elementToThrow != -1 && element != elementToThrow)
25+
return element;
26+
_thrown = element;
27+
throw new Exception("Thrown at " + element);
28+
};
29+
30+
ChannelReader<int> PrepareStage1(int elementToThrow) =>
31+
_channel.Reader
32+
.Pipe(1, CreateThrowIfEqual(elementToThrow))
33+
.Pipe(2, evt => evt * 2);
34+
35+
async Task AssertException(ValueTask<long> task, int elementToThrow)
36+
{
37+
if (elementToThrow == Elements)
38+
_channel.Writer.Complete(new Exception());
39+
else
40+
_channel.Writer.Complete();
41+
42+
await Assert.ThrowsAsync<AggregateException>(async () => await task);
43+
await Assert.ThrowsAsync<ChannelClosedException>(async () => await _channel.CompleteAsync());
44+
}
645

746
[Theory]
47+
[InlineData(0)]
48+
[InlineData(1)]
849
[InlineData(BatchSize - 1)]
950
[InlineData(BatchSize)]
1051
[InlineData(BatchSize + 1)]
1152
[InlineData(-1)]
1253
[InlineData(Elements)]
13-
public static async Task Regular(int elementToThrow)
54+
public Task Regular(int elementToThrow)
1455
{
15-
var channel = Channel.CreateBounded<int>(10000);
16-
17-
for (int i = 0; i < Elements; i++)
18-
{
19-
await channel.Writer.WriteAsync(i);
20-
}
21-
22-
var task = channel.Reader
23-
.Pipe(1, element => elementToThrow == -1 || element == elementToThrow ? throw new Exception() : element)
24-
.Pipe(2, evt => evt * 2)
56+
var task = PrepareStage1(elementToThrow)
2557
//.Batch(20)
2658
.PipeAsync(1, evt => new ValueTask<int>(evt))
2759
.ReadAll(_ => { });
2860

29-
if (elementToThrow == Elements)
30-
channel.Writer.Complete(new Exception());
31-
else
32-
channel.Writer.Complete();
33-
34-
await Assert.ThrowsAsync<AggregateException>(async () => await task);
35-
await Assert.ThrowsAsync<ChannelClosedException>(async () => await channel.CompleteAsync());
61+
return AssertException(task, elementToThrow);
3662
}
3763

3864
[Theory]
@@ -42,28 +68,31 @@ public static async Task Regular(int elementToThrow)
4268
[InlineData(BatchSize)]
4369
[InlineData(BatchSize + 1)]
4470
[InlineData(-1)]
45-
public static async Task Batched(int elementToThrow)
71+
[InlineData(Elements)]
72+
public Task Batched(int elementToThrow)
4673
{
47-
var channel = Channel.CreateBounded<int>(10000);
74+
var task = PrepareStage1(elementToThrow)
75+
.Batch(20)
76+
.ReadAll(_ => { });
4877

49-
for (int i = 0; i < Elements; i++)
50-
{
51-
await channel.Writer.WriteAsync(i);
52-
}
78+
return AssertException(task, elementToThrow);
79+
}
5380

54-
var task = channel.Reader
55-
.Pipe(1, element => elementToThrow == -1 || element == elementToThrow ? throw new Exception() : element)
56-
.Pipe(2, evt => evt * 2)
81+
[Theory]
82+
[InlineData(0)]
83+
[InlineData(1)]
84+
[InlineData(BatchSize - 1)]
85+
[InlineData(BatchSize)]
86+
[InlineData(BatchSize + 1)]
87+
[InlineData(-1)]
88+
[InlineData(Elements)]
89+
public Task BatchPiped(int elementToThrow)
90+
{
91+
var task = PrepareStage1(elementToThrow)
5792
.Batch(20)
5893
.PipeAsync(1, evt => new ValueTask<List<int>>(evt))
5994
.ReadAll(_ => { });
6095

61-
if (elementToThrow == Elements)
62-
channel.Writer.Complete(new Exception());
63-
else
64-
channel.Writer.Complete();
65-
66-
await Assert.ThrowsAsync<AggregateException>(async () => await task);
67-
await Assert.ThrowsAsync<ChannelClosedException>(async () => await channel.CompleteAsync());
96+
return AssertException(task, elementToThrow);
6897
}
6998
}

Open.ChannelExtensions/Extensions.Join.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ public static ChannelReader<T> Join<T>(this ChannelReader<List<T>> source, bool
7878
public static ChannelReader<T> Join<T>(this ChannelReader<T[]> source, bool singleReader = false)
7979
=> new JoiningChannelReader<T[], T>(source, singleReader);
8080

81-
#if NETSTANDARD2_0
82-
#else
8381
/// <summary>
8482
/// Joins collections of the same type into a single channel reader in the order provided.
8583
/// </summary>
@@ -126,5 +124,4 @@ await writer
126124
}
127125
}
128126
}
129-
#endif
130127
}

Open.ChannelExtensions/Extensions.Merge.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public static MergingChannelReader<T> Merge<T>(
3535
if (primary is MergingChannelReader<T> mcr)
3636
return mcr.Merge(secondary, others);
3737

38-
if(others is null || others.Length == 0)
38+
if (others is null || others.Length == 0)
3939
return new MergingChannelReader<T>(ImmutableArray.Create(primary, secondary));
4040

4141
var builder = ImmutableArray.CreateBuilder<ChannelReader<T>>(2 + others.Length);

Open.ChannelExtensions/Extensions.Read.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ public static async ValueTask<List<T>> ReadBatchAsync<T>(this ChannelReader<T> r
101101

102102
if (results.Count == max)
103103
return results;
104-
105104
}
106105
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
107106
}

Open.ChannelExtensions/Extensions.Source.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ public static ChannelReader<T> Source<T>(
254254
TextReader source,
255255
CancellationToken cancellationToken)
256256
=> Source(target, source, out _, false, cancellationToken);
257-
257+
258258
/// <inheritdoc cref="SourceAsync{TWrite, TRead}(Channel{TWrite, TRead}, IEnumerable{ValueTask{TWrite}}, out ValueTask{long}, bool, CancellationToken)"/>
259259
public static ChannelReader<TRead> Source<TWrite, TRead>(
260260
this Channel<TWrite, TRead> target,

Open.ChannelExtensions/Extensions.Write.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ public static ValueTask<long> WriteAllLines(
308308
bool complete,
309309
CancellationToken cancellationToken)
310310
=> WriteAllLines(target, source, complete, false, cancellationToken);
311-
311+
312312
/// <summary>
313313
/// Asynchronously writes all entries from the source to the channel.
314314
/// </summary>

Open.ChannelExtensions/Extensions._.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ public static ChannelReader<TSource> PropagateCompletion<TSource, TTarget>(
103103
if (cancellationToken.IsCancellationRequested)
104104
return source;
105105

106-
source.Completion.ContinueWith(t => {
106+
source.Completion.ContinueWith(t =>
107+
{
107108
if (t.IsFaulted)
108109
target.TryComplete(t.Exception);
109110
else
@@ -336,7 +337,6 @@ public static ChannelReader<T> ToChannel<T>(this IEnumerable<T> source,
336337
=> CreateChannel<T>(capacity, singleReader)
337338
.Source(source, deferredExecution, cancellationToken);
338339

339-
340340
/// <summary>
341341
/// Writes all entries from the source to a channel and calls complete when finished.
342342
/// </summary>

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<RepositoryType>git</RepositoryType>
2323
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
2424
<GenerateDocumentationFile>true</GenerateDocumentationFile>
25-
<Version>8.4.3</Version>
25+
<Version>8.5.0</Version>
2626
<PackageReleaseNotes>Added .Merge, .PipeAsync, and .PropagateCompletion extensions.</PackageReleaseNotes>
2727
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2828
<PublishRepositoryUrl>true</PublishRepositoryUrl>
@@ -55,7 +55,7 @@
5555
</ItemGroup>
5656

5757
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0' ">
58-
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="6.0.0" />
58+
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="8.0.0" />
5959
</ItemGroup>
6060

6161
<!-- Disable the nullable warnings when compiling for .NET Standard 2.0 -->

0 commit comments

Comments
 (0)