Skip to content

Commit d8674b4

Browse files
Improved reading for cancellation token logic (CanBeCancelled) and added "ReadAllAsEnumerables" methods.
1 parent 352b4ce commit d8674b4

File tree

4 files changed

+351
-25
lines changed

4 files changed

+351
-25
lines changed

Open.ChannelExtensions.Tests/BasicTests.cs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,127 @@ public static async Task ReadAll(int testSize)
6464
result.Clear();
6565
}
6666

67+
[Fact]
68+
public static async Task ReadAllAsEnumerables1()
69+
{
70+
var channel = Channel.CreateUnbounded<int>();
71+
var read = channel.Reader.ReadAllAsEnumerables(e =>
72+
{
73+
Thread.Sleep(100);
74+
Assert.Equal(1000, e.Count());
75+
});
76+
77+
await channel.Writer.WriteAll(Enumerable.Range(0, 1000));
78+
for (var i = 0; i < 5; i++)
79+
{
80+
await Task.Delay(1000);
81+
await channel.Writer.WriteAll(Enumerable.Range(0, 1000));
82+
}
83+
84+
channel.Writer.Complete();
85+
await read;
86+
}
87+
88+
[Fact]
89+
public static async Task ReadAllAsEnumerablesAsync1()
90+
{
91+
var channel = Channel.CreateUnbounded<int>();
92+
var read = channel.Reader.ReadAllAsEnumerablesAsync(async e =>
93+
{
94+
await Task.Delay(100);
95+
Assert.Equal(1000, e.Count());
96+
});
97+
98+
await channel.Writer.WriteAll(Enumerable.Range(0, 1000));
99+
for (var i = 0; i < 5; i++)
100+
{
101+
await Task.Delay(1000);
102+
await channel.Writer.WriteAll(Enumerable.Range(0, 1000));
103+
}
104+
105+
channel.Writer.Complete();
106+
await read;
107+
}
108+
109+
[Fact]
110+
public static async Task ReadAllConcurrentlyAsEnumerablesAsync()
111+
{
112+
var channel = Channel.CreateUnbounded<int>();
113+
int total = 0;
114+
var read = channel.Reader.ReadAllConcurrentlyAsEnumerablesAsync(3, async e =>
115+
{
116+
foreach(var i in e)
117+
{
118+
await Task.Delay(1);
119+
Interlocked.Increment(ref total);
120+
}
121+
});
122+
123+
await channel.Writer.WriteAll(Enumerable.Range(0, 1000));
124+
for (var i = 0; i < 5; i++)
125+
{
126+
await Task.Delay(1000);
127+
await channel.Writer.WriteAll(Enumerable.Range(0, 1000));
128+
}
129+
130+
channel.Writer.Complete();
131+
await read;
132+
133+
Assert.Equal(6000, total);
134+
}
135+
136+
137+
[Fact]
138+
public static async Task ReadAllConcurrentlyAsEnumerables()
139+
{
140+
var channel = Channel.CreateUnbounded<int>();
141+
int total = 0;
142+
var read = channel.Reader.ReadAllConcurrentlyAsEnumerables(3, e =>
143+
{
144+
foreach (var i in e)
145+
{
146+
for(var n = 0; n < 2000000; n++)
147+
{
148+
// loop delay
149+
}
150+
Interlocked.Increment(ref total);
151+
}
152+
});
153+
154+
await channel.Writer.WriteAll(Enumerable.Range(0, 1000));
155+
for (var i = 0; i < 5; i++)
156+
{
157+
await Task.Delay(1000);
158+
await channel.Writer.WriteAll(Enumerable.Range(0, 1000));
159+
}
160+
161+
channel.Writer.Complete();
162+
await read;
163+
164+
Assert.Equal(6000, total);
165+
}
166+
167+
168+
[Fact]
169+
public static async Task ReadAllAsEnumerables2()
170+
{
171+
var channel = Channel.CreateUnbounded<int>();
172+
int total = 0;
173+
var read = channel.Reader.ReadAllAsEnumerables(e => total += e.Count());
174+
175+
await channel.Writer.WriteAll(Enumerable.Range(0, 1000));
176+
for (var i = 0; i < 5; i++)
177+
{
178+
await Task.Delay(1000);
179+
await channel.Writer.WriteAll(Enumerable.Range(0, 1000));
180+
}
181+
182+
channel.Writer.Complete();
183+
await read;
184+
185+
Assert.Equal(6000, total);
186+
}
187+
67188
[Theory]
68189
[InlineData(testSize1)]
69190
[InlineData(testSize2)]

Open.ChannelExtensions/Extensions.Read.cs

Lines changed: 131 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,27 @@ public static partial class Extensions
1717
/// </summary>
1818
/// <typeparam name="T">The item type.</typeparam>
1919
/// <param name="reader">The channel reader to read from.</param>
20+
/// <param name="cancellationToken">An optional cancellation token.</param>
2021
/// <returns>An enumerable that will read from the channel until no more are available for read</returns>
21-
public static IEnumerable<T> ReadAvailable<T>(this ChannelReader<T> reader)
22+
public static IEnumerable<T> ReadAvailable<T>(this ChannelReader<T> reader, CancellationToken cancellationToken = default)
2223
{
2324
if (reader is null) throw new ArgumentNullException(nameof(reader));
2425
Contract.EndContractBlock();
2526

26-
return ReadAvailableCore(reader);
27+
return ReadAvailableCore(reader, cancellationToken);
2728

28-
static IEnumerable<T> ReadAvailableCore(ChannelReader<T> reader)
29+
static IEnumerable<T> ReadAvailableCore(ChannelReader<T> reader, CancellationToken token)
2930
{
30-
while (reader.TryRead(out T? e))
31-
yield return e;
31+
if (token.CanBeCanceled)
32+
{
33+
while (!token.IsCancellationRequested && reader.TryRead(out T? e))
34+
yield return e;
35+
}
36+
else
37+
{
38+
while (reader.TryRead(out T? e))
39+
yield return e;
40+
}
3241
}
3342
}
3443

@@ -48,22 +57,43 @@ public static async ValueTask<List<T>> ReadBatchAsync<T>(this ChannelReader<T> r
4857

4958
var results = new List<T>(max);
5059

51-
do
60+
if (cancellationToken.CanBeCanceled)
5261
{
53-
while (
54-
results.Count < max
55-
&& !cancellationToken.IsCancellationRequested
56-
&& reader.TryRead(out T? item))
62+
do
5763
{
58-
results.Add(item);
64+
while (
65+
results.Count < max
66+
&& !cancellationToken.IsCancellationRequested
67+
&& reader.TryRead(out T? item))
68+
{
69+
results.Add(item);
70+
}
71+
72+
if (results.Count == max)
73+
return results;
74+
75+
cancellationToken.ThrowIfCancellationRequested();
5976
}
77+
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
78+
}
79+
else
80+
{
81+
do
82+
{
83+
while (
84+
results.Count < max
85+
&& reader.TryRead(out T? item))
86+
{
87+
results.Add(item);
88+
}
6089

61-
if (results.Count == max)
62-
return results;
90+
if (results.Count == max)
91+
return results;
6392

64-
cancellationToken.ThrowIfCancellationRequested();
93+
cancellationToken.ThrowIfCancellationRequested();
94+
}
95+
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
6596
}
66-
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
6797

6898
results.TrimExcess();
6999
return results;
@@ -95,18 +125,32 @@ public static async ValueTask<long> ReadUntilCancelledAsync<T>(this ChannelReade
95125
long index = 0;
96126
try
97127
{
98-
do
128+
if(cancellationToken.CanBeCanceled)
99129
{
130+
do
131+
{
132+
while (
133+
!cancellationToken.IsCancellationRequested
134+
&& reader.TryRead(out T? item))
135+
{
136+
await receiver(item, index++).ConfigureAwait(false);
137+
}
138+
}
100139
while (
101140
!cancellationToken.IsCancellationRequested
102-
&& reader.TryRead(out T? item))
141+
&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
142+
}
143+
else
144+
{
145+
do
103146
{
104-
await receiver(item, index++).ConfigureAwait(false);
147+
while (reader.TryRead(out T? item))
148+
{
149+
await receiver(item, index++).ConfigureAwait(false);
150+
}
105151
}
152+
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
106153
}
107-
while (
108-
!cancellationToken.IsCancellationRequested
109-
&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
110154
}
111155
catch (OperationCanceledException)
112156
{
@@ -293,6 +337,72 @@ public static async ValueTask<long> ReadAllAsync<T>(this ChannelReader<T> reader
293337
return count;
294338
}
295339

340+
/// <inheritdoc cref="ReadAllAsEnumerablesAsync{T}(ChannelReader{T}, Func{IEnumerable{T}, ValueTask}, bool, CancellationToken)"/>
341+
public static ValueTask ReadAllAsEnumerables<T>(this ChannelReader<T> reader,
342+
Action<IEnumerable<T>> receiver,
343+
bool deferredExecution = false,
344+
CancellationToken cancellationToken = default)
345+
{
346+
if (receiver is null) throw new ArgumentNullException(nameof(receiver));
347+
Contract.EndContractBlock();
348+
349+
return reader.ReadAllAsEnumerablesAsync(
350+
e =>
351+
{
352+
receiver(e);
353+
return new ValueTask();
354+
},
355+
deferredExecution,
356+
cancellationToken);
357+
}
358+
359+
/// <summary>
360+
/// Provides an enumerable to a receiver function.<br/>
361+
/// The enumerable will yield items while they are available
362+
/// and complete when none are available.<br/>
363+
/// </summary>
364+
/// <remarks>See <seealso cref="ReadAvailable{T}(ChannelReader{T}, CancellationToken)"/>.</remarks>
365+
/// <typeparam name="T">The item type.</typeparam>
366+
/// <param name="reader">The channel reader to read from.</param>
367+
/// <param name="receiver">The async receiver function.</param>
368+
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
369+
/// <param name="cancellationToken">An optional cancellation token.</param>
370+
public static async ValueTask ReadAllAsEnumerablesAsync<T>(this ChannelReader<T> reader,
371+
Func<IEnumerable<T>, ValueTask> receiver,
372+
bool deferredExecution = false,
373+
CancellationToken cancellationToken = default)
374+
{
375+
if (reader is null) throw new ArgumentNullException(nameof(reader));
376+
if (receiver is null) throw new ArgumentNullException(nameof(receiver));
377+
Contract.EndContractBlock();
378+
379+
if (deferredExecution)
380+
await Task.Yield();
381+
382+
try
383+
{
384+
if (cancellationToken.CanBeCanceled)
385+
{
386+
while (
387+
!cancellationToken.IsCancellationRequested
388+
&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
389+
{
390+
await receiver(reader.ReadAvailable(cancellationToken)).ConfigureAwait(false);
391+
}
392+
return;
393+
}
394+
395+
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
396+
{
397+
await receiver(reader.ReadAvailable(cancellationToken)).ConfigureAwait(false);
398+
}
399+
}
400+
catch (OperationCanceledException)
401+
{
402+
// In case WaitToReadAsync is cancelled.
403+
}
404+
}
405+
296406
/// <summary>
297407
/// Reads items from the channel and passes them to the receiver.
298408
/// </summary>

0 commit comments

Comments
 (0)