Skip to content

Commit 7fd2548

Browse files
Added expanded batch type capability: .BatchToQueues().
1 parent d8674b4 commit 7fd2548

File tree

6 files changed

+250
-144
lines changed

6 files changed

+250
-144
lines changed

Open.ChannelExtensions.ComparisonTests/Program.cs

Lines changed: 116 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -5,139 +5,138 @@
55
using System.Threading.Tasks;
66
using System.Threading.Tasks.Dataflow;
77

8-
namespace Open.ChannelExtensions.ComparisonTests
8+
namespace Open.ChannelExtensions.ComparisonTests;
9+
10+
static class Program
911
{
10-
class Program
12+
static async Task Main()
1113
{
12-
static async Task Main()
14+
const int repeat = 50;
15+
const int concurrency = 4;
16+
const int testSize = 30000001;
17+
1318
{
14-
const int repeat = 50;
15-
const int concurrency = 4;
16-
const int testSize = 30000001;
17-
18-
{
19-
Console.WriteLine("Standard DataFlow operation test...");
20-
var block = new ActionBlock<int>(async i => await Delay(i));
21-
var sw = Stopwatch.StartNew();
22-
foreach (int i in Enumerable.Range(0, repeat))
23-
block.Post(i);
24-
block.Complete();
25-
await block.Completion;
26-
sw.Stop();
27-
Console.WriteLine(sw.Elapsed);
28-
Console.WriteLine();
29-
}
30-
31-
await BasicTests.ReadAll(testSize);
32-
await BasicTests.ReadAllAsync(testSize);
33-
await BasicTests.BatchThenJoin(testSize, 5001);
34-
await BasicTests.BatchJoin(testSize, 50);
35-
36-
{
37-
Console.WriteLine("Standard Channel filter test...");
38-
System.Collections.Generic.IEnumerable<ValueTask<int>> source = Enumerable
39-
.Repeat((Func<int, ValueTask<int>>)Delay, repeat)
40-
.Select((t, i) => t(i));
41-
42-
var sw = Stopwatch.StartNew();
43-
long total = await source
44-
.ToChannelAsync(singleReader: true)
45-
.Filter(i => i % 2 == 0)
46-
.ReadAll(Dummy);
47-
sw.Stop();
48-
49-
Debug.Assert(total == repeat / 2);
50-
Console.WriteLine(sw.Elapsed);
51-
Console.WriteLine();
52-
}
53-
54-
{
55-
Console.WriteLine("Concurrent DataFlow operation test...");
56-
var sw = Stopwatch.StartNew();
57-
var block = new ActionBlock<int>(async i => await Delay(i), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = concurrency });
58-
foreach (int i in Enumerable.Range(0, repeat))
59-
block.Post(i);
60-
block.Complete();
61-
await block.Completion;
62-
sw.Stop();
63-
Console.WriteLine(sw.Elapsed);
64-
Console.WriteLine();
65-
}
66-
67-
{
68-
Console.WriteLine("Concurrent Channel operation test...");
69-
var sw = Stopwatch.StartNew();
70-
await Enumerable
71-
.Repeat((Func<int, ValueTask<int>>)Delay, repeat)
72-
.Select((t, i) => t(i))
73-
.ToChannelAsync(singleReader: false, maxConcurrency: concurrency)
74-
.ReadAllConcurrently(4, Dummy);
75-
sw.Stop();
76-
Console.WriteLine(sw.Elapsed);
77-
Console.WriteLine();
78-
}
79-
80-
{
81-
Console.WriteLine("Pipe operation test...");
82-
var sw = Stopwatch.StartNew();
83-
long total = await Enumerable
84-
.Repeat((Func<int, ValueTask<int>>)Delay, repeat)
85-
.Select((t, i) => t(i))
86-
.ToChannelAsync()
87-
.Pipe(i => i * 2)
88-
.ReadAll(Dummy);
89-
sw.Stop();
90-
Debug.Assert(total == repeat);
91-
Console.WriteLine(sw.Elapsed);
92-
Console.WriteLine();
93-
}
94-
95-
{
96-
Console.WriteLine("Transform operation test...");
97-
var sw = Stopwatch.StartNew();
98-
await Enumerable
99-
.Repeat((Func<int, ValueTask<int>>)Delay, repeat)
100-
.Select((t, i) => t(i))
101-
.ToChannelAsync()
102-
.Transform(i => i * 2L)
103-
.ReadAll(Dummy);
104-
sw.Stop();
105-
Console.WriteLine(sw.Elapsed);
106-
Console.WriteLine();
107-
}
19+
Console.WriteLine("Standard DataFlow operation test...");
20+
var block = new ActionBlock<int>(async i => await Delay(i));
21+
var sw = Stopwatch.StartNew();
22+
foreach (int i in Enumerable.Range(0, repeat))
23+
block.Post(i);
24+
block.Complete();
25+
await block.Completion;
26+
sw.Stop();
27+
Console.WriteLine(sw.Elapsed);
28+
Console.WriteLine();
29+
}
10830

109-
#if NETCOREAPP3_0
110-
{
111-
Console.WriteLine("Async Enumerable test...");
112-
var sw = Stopwatch.StartNew();
113-
await foreach (var e in Enumerable
114-
.Repeat((Func<int, ValueTask<int>>)Delay, repeat)
115-
.Select((t, i) => t(i))
116-
.ToChannelAsync()
117-
.ReadAllAsync())
118-
Dummy(e);
119-
sw.Stop();
120-
Console.WriteLine(sw.Elapsed);
121-
Console.WriteLine();
122-
}
123-
#endif
31+
await BasicTests.ReadAll(testSize);
32+
await BasicTests.ReadAllAsync(testSize);
33+
await BasicTests.BatchThenJoin(testSize, 5001);
34+
await BasicTests.BatchJoin(testSize, 50);
12435

36+
{
37+
Console.WriteLine("Standard Channel filter test...");
38+
System.Collections.Generic.IEnumerable<ValueTask<int>> source = Enumerable
39+
.Repeat((Func<int, ValueTask<int>>)Delay, repeat)
40+
.Select((t, i) => t(i));
41+
42+
var sw = Stopwatch.StartNew();
43+
long total = await source
44+
.ToChannelAsync(singleReader: true)
45+
.Filter(i => i % 2 == 0)
46+
.ReadAll(Dummy);
47+
sw.Stop();
48+
49+
Debug.Assert(total == repeat / 2);
50+
Console.WriteLine(sw.Elapsed);
51+
Console.WriteLine();
12552
}
12653

127-
static void Dummy(int i)
12854
{
55+
Console.WriteLine("Concurrent DataFlow operation test...");
56+
var sw = Stopwatch.StartNew();
57+
var block = new ActionBlock<int>(async i => await Delay(i), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = concurrency });
58+
foreach (int i in Enumerable.Range(0, repeat))
59+
block.Post(i);
60+
block.Complete();
61+
await block.Completion;
62+
sw.Stop();
63+
Console.WriteLine(sw.Elapsed);
64+
Console.WriteLine();
65+
}
12966

67+
{
68+
Console.WriteLine("Concurrent Channel operation test...");
69+
var sw = Stopwatch.StartNew();
70+
await Enumerable
71+
.Repeat((Func<int, ValueTask<int>>)Delay, repeat)
72+
.Select((t, i) => t(i))
73+
.ToChannelAsync(singleReader: false, maxConcurrency: concurrency)
74+
.ReadAllConcurrently(4, Dummy);
75+
sw.Stop();
76+
Console.WriteLine(sw.Elapsed);
77+
Console.WriteLine();
13078
}
13179

132-
static void Dummy(long i)
13380
{
81+
Console.WriteLine("Pipe operation test...");
82+
var sw = Stopwatch.StartNew();
83+
long total = await Enumerable
84+
.Repeat((Func<int, ValueTask<int>>)Delay, repeat)
85+
.Select((t, i) => t(i))
86+
.ToChannelAsync()
87+
.Pipe(i => i * 2)
88+
.ReadAll(Dummy);
89+
sw.Stop();
90+
Debug.Assert(total == repeat);
91+
Console.WriteLine(sw.Elapsed);
92+
Console.WriteLine();
93+
}
13494

95+
{
96+
Console.WriteLine("Transform operation test...");
97+
var sw = Stopwatch.StartNew();
98+
await Enumerable
99+
.Repeat((Func<int, ValueTask<int>>)Delay, repeat)
100+
.Select((t, i) => t(i))
101+
.ToChannelAsync()
102+
.Transform(i => i * 2L)
103+
.ReadAll(Dummy);
104+
sw.Stop();
105+
Console.WriteLine(sw.Elapsed);
106+
Console.WriteLine();
135107
}
136108

137-
static async ValueTask<int> Delay(int i)
109+
#if NETCOREAPP3_0
138110
{
139-
await Task.Delay(100);
140-
return i;
111+
Console.WriteLine("Async Enumerable test...");
112+
var sw = Stopwatch.StartNew();
113+
await foreach (var e in Enumerable
114+
.Repeat((Func<int, ValueTask<int>>)Delay, repeat)
115+
.Select((t, i) => t(i))
116+
.ToChannelAsync()
117+
.ReadAllAsync())
118+
Dummy(e);
119+
sw.Stop();
120+
Console.WriteLine(sw.Elapsed);
121+
Console.WriteLine();
141122
}
123+
#endif
124+
125+
}
126+
127+
static void Dummy(int i)
128+
{
129+
130+
}
131+
132+
static void Dummy(long i)
133+
{
134+
135+
}
136+
137+
static async ValueTask<int> Delay(int i)
138+
{
139+
await Task.Delay(100);
140+
return i;
142141
}
143142
}

Open.ChannelExtensions.Tests/BasicTests.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,31 @@ public static async Task Batch(int testSize, int batchSize)
303303
Assert.True(r.SequenceEqual(range));
304304
}
305305

306+
307+
[Theory]
308+
[InlineData(1000, 51)]
309+
[InlineData(50, 1000)]
310+
[InlineData(1001, 51)]
311+
[InlineData(51, 5001)]
312+
[InlineData(75, 50)]
313+
public static async Task BatchToQueues(int testSize, int batchSize)
314+
{
315+
IEnumerable<int> range = Enumerable.Range(0, testSize);
316+
int expectedBatchCount = testSize / batchSize + (testSize % batchSize == 0 ? 0 : 1);
317+
var result1 = new List<Queue<int>>(expectedBatchCount);
318+
319+
long total = await range
320+
.ToChannel(singleReader: true)
321+
.BatchToQueues(batchSize, singleReader: true)
322+
.ReadAll(result1.Add);
323+
324+
Assert.Equal(expectedBatchCount, result1.Count);
325+
326+
var r = result1.SelectMany(e => e).ToList();
327+
Assert.Equal(testSize, r.Count);
328+
Assert.True(r.SequenceEqual(range));
329+
}
330+
306331
[Theory]
307332
[InlineData(testSize1, 51)]
308333
[InlineData(testSize1, 5001)]

Open.ChannelExtensions.Tests/BatchTests.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public static async Task ForceBatchTest()
120120
});
121121

122122
using var tokenSource = new CancellationTokenSource(10000);
123-
BatchingChannelReader<int> reader = c.Reader.Batch(3);
123+
var reader = c.Reader.Batch(3);
124124
Assert.Equal(2, await reader.ReadAllAsync(tokenSource.Token, async (batch, i) =>
125125
{
126126
switch (i)
@@ -149,7 +149,7 @@ public static async Task ForceBatchTest()
149149
public static async Task ForceBatchTest2()
150150
{
151151
var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
152-
BatchingChannelReader<int> reader = c.Reader.Batch(3);
152+
var reader = c.Reader.Batch(3);
153153
_ = Task.Run(async () =>
154154
{
155155
await Task.Delay(1000);
@@ -194,7 +194,7 @@ public static async Task ForceBatchTest2()
194194
public static async Task TimeoutTest0()
195195
{
196196
var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
197-
BatchingChannelReader<int> reader = c.Reader.Batch(10).WithTimeout(500);
197+
var reader = c.Reader.Batch(10).WithTimeout(500);
198198
var complete = false;
199199
_ = Task.Run(async () =>
200200
{
@@ -229,7 +229,7 @@ public static async Task TimeoutTest0()
229229
public static async Task TimeoutTest1()
230230
{
231231
var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
232-
BatchingChannelReader<int> reader = c.Reader.Batch(10).WithTimeout(500);
232+
var reader = c.Reader.Batch(10).WithTimeout(500);
233233
_ = Task.Run(async () =>
234234
{
235235
for(var i = 0;i<15;i++)
@@ -272,7 +272,7 @@ public static async Task TimeoutTest1()
272272
public static async Task BatchReadBehavior()
273273
{
274274
var c = Channel.CreateBounded<int>(new BoundedChannelOptions(20) { SingleReader = false, SingleWriter = false });
275-
BatchingChannelReader<int> reader = c.Reader.Batch(10);
275+
var reader = c.Reader.Batch(10);
276276

277277
var queue = new Queue<int>(Enumerable.Range(0, 100));
278278
int e;

0 commit comments

Comments
 (0)