|
1 | 1 | using System; |
| 2 | +using System.Collections.Generic; |
2 | 3 | using System.Diagnostics; |
| 4 | +using System.Linq; |
3 | 5 | using System.Threading; |
4 | 6 | using System.Threading.Channels; |
5 | 7 | using System.Threading.Tasks; |
@@ -269,5 +271,81 @@ public static async Task TimeoutTest1() |
269 | 271 | })); |
270 | 272 | } |
271 | 273 |
|
| 274 | + |
| 275 | + |
| 276 | + [Fact] |
| 277 | + public static async Task BatchReadBehavior() |
| 278 | + { |
| 279 | + var c = Channel.CreateBounded<int>(new BoundedChannelOptions(20) { SingleReader = false, SingleWriter = false }); |
| 280 | + BatchingChannelReader<int> reader = c.Reader.Batch(10); |
| 281 | + |
| 282 | + var queue = new Queue<int>(Enumerable.Range(0, 100)); |
| 283 | + int e; |
| 284 | + while(queue.TryDequeue(out e) && c.Writer.TryWrite(e)) |
| 285 | + await Task.Yield(); |
| 286 | + |
| 287 | + Assert.True(69 <= queue.Count); |
| 288 | + await reader.WaitToReadAsync(); |
| 289 | + // At this point, a batch is prepared and there is room in the channel. |
| 290 | + await Dequeue(); |
| 291 | + |
| 292 | + Assert.True(59 <= queue.Count); |
| 293 | + Assert.True(reader.TryRead(out var batch)); |
| 294 | + Assert.Equal(10, batch.Count); |
| 295 | + // At this point nothing is waiting so either a wait must occur or a read to trigger a new batch. |
| 296 | + |
| 297 | + Assert.True(reader.TryRead(out batch)); |
| 298 | + Assert.Equal(10, batch.Count); |
| 299 | + await Dequeue(); |
| 300 | + |
| 301 | + Assert.True(49 <= queue.Count); |
| 302 | + Assert.True(reader.TryRead(out batch)); |
| 303 | + Assert.Equal(10, batch.Count); |
| 304 | + await Dequeue(); |
| 305 | + |
| 306 | + Assert.True(39 <= queue.Count); |
| 307 | + Assert.True(reader.TryRead(out batch)); |
| 308 | + Assert.Equal(10, batch.Count); |
| 309 | + await Dequeue(); |
| 310 | + |
| 311 | + Assert.True(29 <= queue.Count); |
| 312 | + Assert.True(reader.TryRead(out batch)); |
| 313 | + Assert.Equal(10, batch.Count); |
| 314 | + await Dequeue(); |
| 315 | + |
| 316 | + Assert.True(19 <= queue.Count); |
| 317 | + Assert.True(reader.TryRead(out batch)); |
| 318 | + Assert.Equal(10, batch.Count); |
| 319 | + await Dequeue(); |
| 320 | + |
| 321 | + Assert.True(09 <= queue.Count); |
| 322 | + Assert.True(reader.TryRead(out batch)); |
| 323 | + Assert.Equal(10, batch.Count); |
| 324 | + await Dequeue(); |
| 325 | + |
| 326 | + Assert.True(reader.TryRead(out batch)); |
| 327 | + Assert.Equal(10, batch.Count); |
| 328 | + await Dequeue(); |
| 329 | + |
| 330 | + Assert.Empty(queue); |
| 331 | + c.Writer.Complete(); |
| 332 | + |
| 333 | + Assert.True(reader.TryRead(out batch)); |
| 334 | + Assert.Equal(10, batch.Count); |
| 335 | + |
| 336 | + Assert.True(reader.TryRead(out batch)); |
| 337 | + Assert.Equal(10, batch.Count); |
| 338 | + |
| 339 | + Assert.False(reader.TryRead(out _)); |
| 340 | + |
| 341 | + async ValueTask Dequeue() |
| 342 | + { |
| 343 | + Assert.True(c.Writer.TryWrite(e)); |
| 344 | + while (queue.TryDequeue(out e) && c.Writer.TryWrite(e)) |
| 345 | + await Task.Yield(); |
| 346 | + } |
| 347 | + } |
| 348 | + |
| 349 | + |
272 | 350 | } |
273 | 351 |
|
0 commit comments