Skip to content

Commit f8a1797

Browse files
committed
Add a "ChunkAsync" extension method for IAsyncEnumerable
1 parent 85f1eb7 commit f8a1797

File tree

2 files changed

+161
-0
lines changed
  • Ix.NET/Source
    • System.Linq.Async.Tests/System/Linq/Operators
    • System.Linq.Async/System/Linq/Operators

2 files changed

+161
-0
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Collections.Generic;
7+
using System.Linq;
8+
using System.Threading.Tasks;
9+
using Xunit;
10+
11+
namespace Tests
12+
{
13+
public class Chunk : AsyncEnumerableTests
14+
{
15+
[Fact]
16+
public async Task Chunk_Null()
17+
{
18+
await Assert.ThrowsAsync<ArgumentNullException>(async () =>
19+
{
20+
IAsyncEnumerable<int>? xs = null;
21+
var ys = xs!.ChunkAsync(24);
22+
23+
var e = ys.GetAsyncEnumerator();
24+
await e.MoveNextAsync();
25+
});
26+
}
27+
28+
[Theory]
29+
[InlineData(0)]
30+
[InlineData(-1)]
31+
public async Task Chunk_NonPositiveSize(int size)
32+
{
33+
await Assert.ThrowsAsync<ArgumentOutOfRangeException>("size", async () =>
34+
{
35+
var xs = new[] { 24 }.ToAsyncEnumerable();
36+
var ys = xs.ChunkAsync(size);
37+
38+
var e = ys.GetAsyncEnumerator();
39+
await e.MoveNextAsync();
40+
});
41+
}
42+
43+
[Fact]
44+
public async Task Chunk_Simple_Evenly()
45+
{
46+
var xs = new[] { 1, 1, 4, 5, 1, 4 }.ToAsyncEnumerable();
47+
var ys = xs.ChunkAsync(3);
48+
49+
var e = ys.GetAsyncEnumerator();
50+
await HasNextAsync(e, new[] { 1, 1, 4 });
51+
await HasNextAsync(e, new[] { 5, 1, 4 });
52+
await NoNextAsync(e);
53+
}
54+
55+
[Fact]
56+
public async Task Chunk_Simple_Unevenly()
57+
{
58+
var xs = new[] { 1, 9, 1, 9, 8, 1, 0 }.ToAsyncEnumerable();
59+
var ys = xs.ChunkAsync(4);
60+
61+
var e = ys.GetAsyncEnumerator();
62+
await HasNextAsync(e, new[] { 1, 9, 1, 9 });
63+
await HasNextAsync(e, new[] { 8, 1, 0 });
64+
await NoNextAsync(e);
65+
}
66+
67+
[Fact]
68+
public async Task Chunk_SourceSmallerThanChunkSize()
69+
{
70+
var xs = new[] { 8, 9, 3 }.ToAsyncEnumerable();
71+
var ys = xs.ChunkAsync(4);
72+
73+
var e = ys.GetAsyncEnumerator();
74+
await HasNextAsync(e, new[] { 8, 9, 3 });
75+
await NoNextAsync(e);
76+
}
77+
78+
[Fact]
79+
public async Task Chunk_EmptySource()
80+
{
81+
var xs = new int[0].ToAsyncEnumerable();
82+
var ys = xs.ChunkAsync(24);
83+
84+
var e = ys.GetAsyncEnumerator();
85+
await NoNextAsync(e);
86+
}
87+
}
88+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Collections.Generic;
6+
using System.Diagnostics;
7+
using System.Runtime.CompilerServices;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
11+
namespace System.Linq
12+
{
13+
public static partial class AsyncEnumerable
14+
{
15+
/// <summary>
16+
/// Split the elements of an async-enumerable sequence into chunks of size at most <paramref name="size"/>.
17+
/// </summary>
18+
/// <remarks>
19+
/// Every chunk except the last will be of size <paramref name="size"/>.
20+
/// The last chunk will contain the remaining elements and may be of a smaller size.
21+
/// </remarks>
22+
/// <param name="source">
23+
/// An <see cref="IAsyncEnumerable{T}"/> whose elements to chunk.
24+
/// </param>
25+
/// <param name="size">
26+
/// Maximum size of each chunk.
27+
/// </param>
28+
/// <param name="cancellationToken">The optional cancellation token to be used for cancelling the sequence at any time.</param>
29+
/// <typeparam name="TSource">
30+
/// The type of the elements of source.
31+
/// </typeparam>
32+
/// <returns>
33+
/// An <see cref="IAsyncEnumerable{T}"/> that contains the elements the input sequence split into chunks of size <paramref name="size"/>.
34+
/// </returns>
35+
/// <exception cref="ArgumentNullException">
36+
/// <paramref name="source"/> is null.
37+
/// </exception>
38+
/// <exception cref="ArgumentOutOfRangeException">
39+
/// <paramref name="size"/> is below 1.
40+
/// </exception>
41+
public static async IAsyncEnumerable<TSource[]> ChunkAsync<TSource>(this IAsyncEnumerable<TSource> source, int size, [EnumeratorCancellation] CancellationToken cancellationToken = default)
42+
{
43+
if (source == null)
44+
throw Error.ArgumentNull(nameof(source));
45+
46+
if (size < 1)
47+
throw Error.ArgumentOutOfRange(nameof(size));
48+
49+
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
50+
51+
if (await e.MoveNextAsync())
52+
{
53+
var chunkBuilder = new List<TSource>();
54+
while (true)
55+
{
56+
do
57+
{
58+
chunkBuilder.Add(e.Current);
59+
}
60+
while (chunkBuilder.Count < size && await e.MoveNextAsync());
61+
62+
yield return chunkBuilder.ToArray();
63+
64+
if (chunkBuilder.Count < size || !await e.MoveNextAsync())
65+
{
66+
yield break;
67+
}
68+
chunkBuilder.Clear();
69+
}
70+
}
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)