Skip to content

Commit a8dff0e

Browse files
author
Oren (electricessence)
committed
Fixed batching reader issue.
1 parent 75eacc0 commit a8dff0e

File tree

4 files changed

+184
-78
lines changed

4 files changed

+184
-78
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.Linq;
5+
using System.Threading;
6+
using System.Threading.Channels;
7+
using System.Threading.Tasks;
8+
using Xunit;
9+
10+
namespace Open.ChannelExtensions.Tests
11+
{
12+
public static class BatchTests
13+
{
14+
[Fact]
15+
public static async Task SimpleBatch2Test()
16+
{
17+
var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
18+
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
19+
Task.Run(async () =>
20+
{
21+
await Task.Delay(1000);
22+
c.Writer.TryWrite(1);
23+
c.Writer.TryWrite(2);
24+
c.Writer.TryWrite(3);
25+
c.Writer.TryWrite(4);
26+
c.Writer.TryWrite(5);
27+
c.Writer.TryWrite(6);
28+
c.Writer.Complete();
29+
});
30+
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
31+
32+
33+
await c.Reader
34+
.Batch(2)
35+
.ReadAllAsync(async (batch, i) =>
36+
{
37+
switch(i)
38+
{
39+
case 0:
40+
Assert.Equal(1, batch[0]);
41+
Assert.Equal(2, batch[1]);
42+
break;
43+
case 1:
44+
Assert.Equal(3, batch[0]);
45+
Assert.Equal(4, batch[1]);
46+
break;
47+
case 2:
48+
Assert.Equal(5, batch[0]);
49+
Assert.Equal(6, batch[1]);
50+
break;
51+
default:
52+
throw new Exception("Shouldn't arrive here.");
53+
}
54+
await Task.Delay(500);
55+
});
56+
57+
}
58+
59+
[Fact]
60+
public static async Task Batch2TestWithDelay()
61+
{
62+
var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
63+
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
64+
Task.Run(async () =>
65+
{
66+
await Task.Delay(1000);
67+
c.Writer.TryWrite(1);
68+
c.Writer.TryWrite(2);
69+
c.Writer.TryWrite(3);
70+
c.Writer.TryWrite(4);
71+
c.Writer.TryWrite(5);
72+
c.Writer.TryWrite(6);
73+
});
74+
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
75+
76+
using var tokenSource = new CancellationTokenSource();
77+
var token = tokenSource.Token;
78+
await c.Reader
79+
.Batch(2)
80+
.ReadAllAsync(async (batch, i) =>
81+
{
82+
switch (i)
83+
{
84+
case 0:
85+
Assert.Equal(1, batch[0]);
86+
Assert.Equal(2, batch[1]);
87+
break;
88+
case 1:
89+
Assert.Equal(3, batch[0]);
90+
Assert.Equal(4, batch[1]);
91+
_ = Task.Run(async () => {
92+
await Task.Delay(60000, token);
93+
if(!token.IsCancellationRequested) c.Writer.TryComplete(new Exception("Should have completed successfuly."));
94+
});
95+
break;
96+
case 2:
97+
Assert.Equal(5, batch[0]);
98+
Assert.Equal(6, batch[1]);
99+
tokenSource.Cancel();
100+
c.Writer.Complete();
101+
break;
102+
default:
103+
throw new Exception("Shouldn't arrive here.");
104+
}
105+
await Task.Delay(500);
106+
});
107+
108+
}
109+
}
110+
}

Open.ChannelExtensions/BatchingChannelReader.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,14 @@ protected override bool TryPipeItems()
7878

7979
while (source.TryRead(out T item))
8080
{
81+
c.Add(item);
82+
8183
if (c.Count == _batchSize)
8284
{
83-
_current = new List<T>(_batchSize) { item };
85+
_current = new List<T>(_batchSize);
8486
Buffer.Writer.TryWrite(c);
8587
return true;
8688
}
87-
88-
c.Add(item);
8989
}
9090

9191
return false;

0 commit comments

Comments
 (0)