Skip to content

Commit 8e83e5f

Browse files
author
Oren (electricessence)
committed
Checkpoint: Revised APIs, peformance rework, deferredExecution option, singleReader option.
1 parent 3806465 commit 8e83e5f

22 files changed

+2697
-268
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>netcoreapp3.0</TargetFramework>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<ProjectReference Include="..\Open.ChannelExtensions.Tests\Open.ChannelExtensions.Tests.csproj" />
10+
</ItemGroup>
11+
12+
</Project>

Open.ChannelExtensions.Tests/Program.cs renamed to Open.ChannelExtensions.ComparisonTests/Program.cs

Lines changed: 17 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
1-
using System;
2-
using System.Collections.Generic;
1+
using Open.ChannelExtensions.Tests;
2+
using System;
33
using System.Diagnostics;
44
using System.Linq;
55
using System.Threading.Tasks;
66
using System.Threading.Tasks.Dataflow;
77

8-
namespace Open.ChannelExtensions.Tests
8+
namespace Open.ChannelExtensions.ComparisonTests
99
{
1010
class Program
1111
{
1212
static async Task Main()
1313
{
1414
const int repeat = 50;
1515
const int concurrency = 4;
16+
const int testSize = 30000001;
17+
1618

1719
{
1820
Console.WriteLine("Standard DataFlow operation test...");
19-
var sw = Stopwatch.StartNew();
2021
var block = new ActionBlock<int>(async i => await Delay(i));
22+
var sw = Stopwatch.StartNew();
2123
foreach (var i in Enumerable.Range(0, repeat))
2224
block.Post(i);
2325
block.Complete();
@@ -27,56 +29,24 @@ static async Task Main()
2729
Console.WriteLine();
2830
}
2931

30-
{
31-
Console.WriteLine("Batch + join test 1...");
32-
var sw = Stopwatch.StartNew();
33-
var range = Enumerable
34-
.Range(0, 10000000);
35-
36-
var result = new List<int>(10000000);
37-
38-
var total = await range
39-
.ToChannel()
40-
.Batch(5000)
41-
.Join()
42-
.ReadAll(i=> result.Add(i));
43-
44-
sw.Stop();
45-
Debug.Assert(result.SequenceEqual(range));
46-
Console.WriteLine(sw.Elapsed);
47-
Console.WriteLine();
48-
}
49-
50-
{
51-
Console.WriteLine("Batch + join test 2...");
52-
var sw = Stopwatch.StartNew();
53-
var range = Enumerable
54-
.Range(0, 10000000);
55-
56-
var result = new List<int>(10000000);
57-
58-
var total = await range
59-
.ToChannel()
60-
.Batch(50)
61-
.Join()
62-
.ReadAll(i => result.Add(i));
63-
64-
sw.Stop();
65-
Debug.Assert(result.SequenceEqual(range));
66-
Console.WriteLine(sw.Elapsed);
67-
Console.WriteLine();
68-
}
32+
await BasicTests.ReadAll(testSize);
33+
await BasicTests.ReadAllAsync(testSize);
34+
await BasicTests.BatchThenJoin(testSize, 5001);
35+
await BasicTests.BatchJoin(testSize, 50);
6936

7037
{
7138
Console.WriteLine("Standard Channel filter test...");
72-
var sw = Stopwatch.StartNew();
73-
var total = await Enumerable
39+
var source = Enumerable
7440
.Repeat((Func<int, ValueTask<int>>)Delay, repeat)
75-
.Select((t, i) => t(i))
41+
.Select((t, i) => t(i));
42+
43+
var sw = Stopwatch.StartNew();
44+
var total = await source
7645
.ToChannelAsync(singleReader: true)
7746
.Filter(i => i % 2 == 0)
7847
.ReadAll(Dummy);
7948
sw.Stop();
49+
8050
Debug.Assert(total == repeat / 2);
8151
Console.WriteLine(sw.Elapsed);
8252
Console.WriteLine();
@@ -141,7 +111,7 @@ await Enumerable
141111
{
142112
Console.WriteLine("Async Enumerable test...");
143113
var sw = Stopwatch.StartNew();
144-
await foreach(var e in Enumerable
114+
await foreach (var e in Enumerable
145115
.Repeat((Func<int, ValueTask<int>>)Delay, repeat)
146116
.Select((t, i) => t(i))
147117
.ToChannelAsync()
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.Linq;
5+
using System.Threading.Tasks;
6+
using Xunit;
7+
8+
namespace Open.ChannelExtensions.Tests
9+
{
10+
public static partial class BasicTests
11+
{
12+
const int testSize1 = 10000001;
13+
const int testSize2 = 30000001;
14+
15+
[Theory]
16+
[InlineData(testSize1)]
17+
[InlineData(testSize2)]
18+
public static async Task ReadAll(int testSize)
19+
{
20+
var range = Enumerable.Range(0, testSize);
21+
var result = new List<int>(testSize);
22+
23+
var sw = Stopwatch.StartNew();
24+
var total = await range
25+
.ToChannel()
26+
.ReadAll(i => result.Add(i));
27+
sw.Stop();
28+
29+
Console.WriteLine("ReadAll(): {0}", sw.Elapsed);
30+
Console.WriteLine();
31+
32+
Assert.Equal(testSize, result.Count);
33+
Assert.True(result.SequenceEqual(range));
34+
result.Clear();
35+
}
36+
37+
[Theory]
38+
[InlineData(testSize1)]
39+
[InlineData(testSize2)]
40+
public static async Task ReadAllAsync(int testSize)
41+
{
42+
var range = Enumerable.Range(0, testSize);
43+
var result = new List<int>(testSize);
44+
45+
var sw = Stopwatch.StartNew();
46+
var total = await range
47+
.ToChannel()
48+
.ReadAllAsync(i =>
49+
{
50+
result.Add(i);
51+
return new ValueTask();
52+
});
53+
sw.Stop();
54+
55+
Console.WriteLine("Channel.ReadAllAsync(): {0}", sw.Elapsed);
56+
Console.WriteLine();
57+
58+
Assert.Equal(testSize, result.Count);
59+
Assert.True(result.SequenceEqual(range));
60+
result.Clear();
61+
}
62+
63+
[Theory]
64+
[InlineData(testSize1, 51)]
65+
[InlineData(testSize1, 5001)]
66+
[InlineData(testSize2, 51)]
67+
[InlineData(testSize2, 5001)]
68+
public static async Task BatchThenJoin(int testSize, int batchSize)
69+
{
70+
var range = Enumerable.Range(0, testSize);
71+
var result1 = new List<List<int>>(testSize / batchSize + 1);
72+
73+
{
74+
var sw = Stopwatch.StartNew();
75+
var total = await range
76+
.ToChannel()
77+
.Batch(batchSize)
78+
.ReadAll(i => result1.Add(i));
79+
sw.Stop();
80+
81+
Console.WriteLine("Channel.Batch({1}): {0}", sw.Elapsed, batchSize);
82+
Console.WriteLine();
83+
84+
var r = result1.SelectMany(e => e).ToList();
85+
Assert.Equal(testSize, r.Count);
86+
Assert.True(r.SequenceEqual(range));
87+
}
88+
89+
{
90+
var result2 = new List<int>(testSize);
91+
var sw = Stopwatch.StartNew();
92+
var total = await result1
93+
.ToChannel()
94+
.Join()
95+
.ReadAll(i => result2.Add(i));
96+
sw.Stop();
97+
98+
Console.WriteLine("Channel.Join(): {0}", sw.Elapsed);
99+
Console.WriteLine();
100+
101+
Assert.Equal(testSize, result2.Count);
102+
Assert.True(result2.SequenceEqual(range));
103+
result2.Clear();
104+
result2.TrimExcess();
105+
}
106+
107+
result1.Clear();
108+
result1.TrimExcess();
109+
}
110+
111+
[Theory]
112+
[InlineData(testSize1, 51)]
113+
[InlineData(testSize1, 5001)]
114+
[InlineData(testSize2, 51)]
115+
[InlineData(testSize2, 5001)]
116+
public static async Task BatchJoin(int testSize, int batchSize)
117+
{
118+
var range = Enumerable.Range(0, testSize);
119+
var result = new List<int>(testSize);
120+
121+
var sw = Stopwatch.StartNew();
122+
var total = await range
123+
.ToChannel()
124+
.Batch(batchSize)
125+
.Join()
126+
.ReadAll(i => result.Add(i));
127+
sw.Stop();
128+
129+
Console.WriteLine("Channel.Batch({1}).Join(): {0}", sw.Elapsed, batchSize);
130+
Console.WriteLine();
131+
132+
Assert.Equal(testSize, result.Count);
133+
Assert.True(result.SequenceEqual(range));
134+
}
135+
}
136+
}
Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<OutputType>Exe</OutputType>
55
<TargetFramework>netcoreapp3.0</TargetFramework>
66
<LangVersion>latest</LangVersion>
7-
</PropertyGroup>
7+
<IsPackable>false</IsPackable>
8+
</PropertyGroup>
89

910
<ItemGroup>
1011
<ProjectReference Include="..\Open.ChannelExtensions\Open.ChannelExtensions.csproj" />
1112
</ItemGroup>
1213

14+
<ItemGroup>
15+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
16+
<PackageReference Include="xunit" Version="2.4.0" />
17+
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
18+
<PackageReference Include="coverlet.collector" Version="1.0.1" />
19+
</ItemGroup>
20+
1321
</Project>

Open.ChannelExtensions.sln

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ VisualStudioVersion = 16.0.28803.156
55
MinimumVisualStudioVersion = 10.0.40219.1
66
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Open.ChannelExtensions", "Open.ChannelExtensions\Open.ChannelExtensions.csproj", "{04768159-4AD9-434D-A849-8901D7B0E846}"
77
EndProject
8-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Open.ChannelExtensions.Tests", "Open.ChannelExtensions.Tests\Open.ChannelExtensions.Tests.csproj", "{CEB5E3C7-E8A9-4F30-92AD-46838A152BD2}"
8+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Open.ChannelExtensions.Tests", "Open.ChannelExtensions.Tests\Open.ChannelExtensions.Tests.csproj", "{CEB5E3C7-E8A9-4F30-92AD-46838A152BD2}"
9+
EndProject
10+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Open.ChannelExtensions.ComparisonTests", "Open.ChannelExtensions.ComparisonTests\Open.ChannelExtensions.ComparisonTests.csproj", "{F13B7605-FB04-4769-B182-77482C900B28}"
911
EndProject
1012
Global
1113
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -21,6 +23,10 @@ Global
2123
{CEB5E3C7-E8A9-4F30-92AD-46838A152BD2}.Debug|Any CPU.Build.0 = Debug|Any CPU
2224
{CEB5E3C7-E8A9-4F30-92AD-46838A152BD2}.Release|Any CPU.ActiveCfg = Release|Any CPU
2325
{CEB5E3C7-E8A9-4F30-92AD-46838A152BD2}.Release|Any CPU.Build.0 = Release|Any CPU
26+
{F13B7605-FB04-4769-B182-77482C900B28}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
27+
{F13B7605-FB04-4769-B182-77482C900B28}.Debug|Any CPU.Build.0 = Debug|Any CPU
28+
{F13B7605-FB04-4769-B182-77482C900B28}.Release|Any CPU.ActiveCfg = Release|Any CPU
29+
{F13B7605-FB04-4769-B182-77482C900B28}.Release|Any CPU.Build.0 = Release|Any CPU
2430
EndGlobalSection
2531
GlobalSection(SolutionProperties) = preSolution
2632
HideSolutionNode = FALSE
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
using System;
2+
using System.Diagnostics.Contracts;
3+
using System.Threading;
4+
using System.Threading.Channels;
5+
using System.Threading.Tasks;
6+
7+
namespace Open.ChannelExtensions
8+
{
9+
abstract class BufferingChannelReader<TIn, TOut> : ChannelReader<TOut>
10+
{
11+
protected readonly ChannelReader<TIn> Source;
12+
protected readonly Channel<TOut> Buffer;
13+
public BufferingChannelReader(ChannelReader<TIn> source, bool singleReader)
14+
{
15+
Source = source ?? throw new ArgumentNullException(nameof(source));
16+
Contract.EndContractBlock();
17+
18+
Buffer = Extensions.CreateChannel<TOut>(-1, singleReader);
19+
20+
if (Source.Completion.IsCompleted)
21+
Buffer.Writer.Complete(Source.Completion.Exception);
22+
else
23+
{
24+
Source.Completion.ContinueWith(t =>
25+
{
26+
// Need to be sure writing is done before we continue...
27+
lock (Buffer)
28+
{
29+
TryPipeItems();
30+
Buffer.Writer.Complete(t.Exception);
31+
}
32+
});
33+
}
34+
}
35+
36+
public override Task Completion => Buffer.Reader.Completion;
37+
38+
protected abstract bool TryPipeItems();
39+
40+
public override bool TryRead(out TOut item)
41+
{
42+
do
43+
{
44+
if (Buffer.Reader.TryRead(out item))
45+
return true;
46+
}
47+
while (TryPipeItems());
48+
49+
#pragma warning disable CS8653 // A default expression introduces a null value for a type parameter.
50+
item = default;
51+
#pragma warning restore CS8653 // A default expression introduces a null value for a type parameter.
52+
return false;
53+
}
54+
55+
public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
56+
{
57+
if (cancellationToken.IsCancellationRequested)
58+
return new ValueTask<bool>(Task.FromCanceled<bool>(cancellationToken));
59+
60+
var b = Buffer.Reader.WaitToReadAsync(cancellationToken);
61+
if (b.IsCompletedSuccessfully || Source.Completion.IsCompleted)
62+
return b;
63+
64+
var s = Source.WaitToReadAsync(cancellationToken);
65+
if (s.IsCompletedSuccessfully)
66+
return s.Result ? new ValueTask<bool>(true) : b;
67+
68+
return WaitCore();
69+
70+
async ValueTask<bool> WaitCore()
71+
{
72+
cancellationToken.ThrowIfCancellationRequested();
73+
74+
// Not sure if there's a better way to 'WhenAny' with a ValueTask yet.
75+
var bt = b.AsTask();
76+
var st = s.AsTask();
77+
var first = await Task.WhenAny(bt, st);
78+
// Either one? Ok go.
79+
if (first.Result) return true;
80+
// Buffer returned false? We're done.
81+
if (first == bt) return false;
82+
// Second return false? Wait for buffer.
83+
return await bt;
84+
}
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)