Skip to content

Commit 6939cbd

Browse files
author
Oren (electricessence)
committed
Final touches before release.
1 parent a8226ae commit 6939cbd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+142
-24816
lines changed

Open.ChannelExtensions.Tests/BasicTests.cs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ public static async Task BatchJoin(int testSize, int batchSize)
121121
var sw = Stopwatch.StartNew();
122122
var total = await range
123123
.ToChannel(singleReader: true)
124-
.Batch(batchSize)
125-
.Join()
124+
.Batch(batchSize, singleReader: true)
125+
.Join(singleReader: true)
126126
.ReadAll(i => result.Add(i));
127127
sw.Stop();
128128

@@ -133,6 +133,40 @@ public static async Task BatchJoin(int testSize, int batchSize)
133133
Assert.True(result.SequenceEqual(range));
134134
}
135135

136+
[Theory]
137+
[InlineData(11)]
138+
[InlineData(51)]
139+
[InlineData(101)]
140+
[InlineData(1001)]
141+
public static async Task JoinAsync(int repeat)
142+
{
143+
var testSize = repeat * repeat;
144+
var range = Enumerable.Repeat(Samples(), repeat);
145+
var result = new List<int>(testSize);
146+
147+
var sw = Stopwatch.StartNew();
148+
var total = await range
149+
.ToChannel(singleReader: true)
150+
.Join(singleReader: true)
151+
.ReadAll(i => result.Add(i));
152+
sw.Stop();
153+
154+
Console.WriteLine("Channel<IAsyncEnumerable>.Join(): {0}", sw.Elapsed);
155+
Console.WriteLine();
156+
157+
Assert.Equal(testSize, result.Count);
158+
Assert.True(result.SequenceEqual(Enumerable.Repeat(Enumerable.Range(0, repeat), repeat).SelectMany(i => i)));
159+
160+
async IAsyncEnumerable<int> Samples()
161+
{
162+
for (var i = 0; i < repeat; i++)
163+
{
164+
var x = new ValueTask<int>(i);
165+
yield return await x;
166+
}
167+
}
168+
}
169+
136170
[Theory]
137171
[InlineData(testSize1)]
138172
[InlineData(testSize2)]

Open.ChannelExtensions/BufferingChannelReader.cs

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,20 @@ namespace Open.ChannelExtensions
99
abstract class BufferingChannelReader<TIn, TOut> : ChannelReader<TOut>
1010
{
1111
protected ChannelReader<TIn>? Source;
12-
protected readonly Channel<TOut> Buffer;
12+
protected readonly Channel<TOut>? Buffer;
1313
public BufferingChannelReader(ChannelReader<TIn> source, bool singleReader)
1414
{
1515
Source = source ?? throw new ArgumentNullException(nameof(source));
1616
Contract.EndContractBlock();
1717

18-
Buffer = Extensions.CreateChannel<TOut>(-1, singleReader);
19-
2018
if (source.Completion.IsCompleted)
2119
{
22-
Buffer.Writer.Complete(source.Completion.Exception);
20+
Buffer = null;
2321
}
2422
else
2523
{
24+
Buffer = Extensions.CreateChannel<TOut>(-1, singleReader);
25+
2626
source.Completion.ContinueWith(t =>
2727
{
2828
// Need to be sure writing is done before we continue...
@@ -37,55 +37,58 @@ public BufferingChannelReader(ChannelReader<TIn> source, bool singleReader)
3737
}
3838
}
3939

40-
public override Task Completion => Buffer.Reader.Completion;
40+
public override Task Completion => Buffer?.Reader.Completion ?? Task.CompletedTask;
4141

4242
protected abstract bool TryPipeItems();
4343

4444
public override bool TryRead(out TOut item)
4545
{
46-
do
46+
if (Buffer != null) do
4747
{
4848
if (Buffer.Reader.TryRead(out item))
4949
return true;
5050
}
5151
while (TryPipeItems());
5252

53-
#pragma warning disable CS8653 // A default expression introduces a null value for a type parameter.
54-
item = default;
55-
#pragma warning restore CS8653 // A default expression introduces a null value for a type parameter.
53+
item = default!;
5654
return false;
5755
}
5856

5957
public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
6058
{
59+
if (Buffer == null || Buffer.Reader.Completion.IsCompleted)
60+
return new ValueTask<bool>(false);
61+
6162
if (cancellationToken.IsCancellationRequested)
6263
return new ValueTask<bool>(Task.FromCanceled<bool>(cancellationToken));
6364

64-
var source = Source;
6565
var b = Buffer.Reader.WaitToReadAsync(cancellationToken);
66-
if (b.IsCompletedSuccessfully || source==null || source.Completion.IsCompleted)
66+
if (b.IsCompleted)
6767
return b;
6868

69-
var s = source.WaitToReadAsync(cancellationToken);
70-
if (s.IsCompletedSuccessfully)
71-
return s.Result ? new ValueTask<bool>(true) : b;
69+
var source = Source;
70+
if (source == null)
71+
return b;
7272

7373
return WaitCore();
7474

7575
async ValueTask<bool> WaitCore()
7676
{
77-
cancellationToken.ThrowIfCancellationRequested();
78-
79-
// Not sure if there's a better way to 'WhenAny' with a ValueTask yet.
80-
var bt = b.AsTask();
81-
var st = s.AsTask();
82-
var first = await Task.WhenAny(bt, st);
83-
// Either one? Ok go.
84-
if (first.Result) return true;
85-
// Buffer returned false? We're done.
86-
if (first == bt) return false;
87-
// Second return false? Wait for buffer.
88-
return await bt;
77+
78+
start:
79+
80+
if (b.IsCompleted) return await b;
81+
82+
var s = source.WaitToReadAsync(cancellationToken);
83+
if (s.IsCompleted && !b.IsCompleted)
84+
TryPipeItems();
85+
86+
if (b.IsCompleted) return await b;
87+
await s;
88+
if (b.IsCompleted) return await b;
89+
TryPipeItems();
90+
91+
goto start;
8992
}
9093
}
9194
}

Open.ChannelExtensions/Documentation.xml

Lines changed: 22 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Open.ChannelExtensions/Extensions.Batch.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@ public BatchingChannelReader(ChannelReader<T> source, int batchSize, bool single
2121
_current = source.Completion.IsCompleted ? null : new List<T>(batchSize);
2222
}
2323

24-
2524
protected override bool TryPipeItems()
2625
{
27-
if (_current == null)
26+
if (_current == null || Buffer == null)
2827
return false;
2928

3029
if (Buffer.Reader.Completion.IsCompleted)

Open.ChannelExtensions/Extensions.Filter.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ public override bool TryRead(out T item)
2929
return true;
3030
}
3131

32-
#pragma warning disable CS8653 // A default expression introduces a null value for a type parameter.
33-
item = default;
34-
#pragma warning restore CS8653 // A default expression introduces a null value for a type parameter.
32+
item = default!;
3533
return false;
3634
}
3735

Open.ChannelExtensions/Extensions.Join.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public JoiningChannelReader(ChannelReader<TList> source, bool singleReader) : ba
1616
protected override bool TryPipeItems()
1717
{
1818
var source = Source;
19-
if (source==null || source.Completion.IsCompleted)
19+
if (source == null || source.Completion.IsCompleted || Buffer == null)
2020
return false;
2121

2222
lock (Buffer)
@@ -91,20 +91,20 @@ public static ChannelReader<T> Join<T>(this ChannelReader<T[]> source, bool sing
9191
/// </summary>
9292
/// <typeparam name="T">The result type.</typeparam>
9393
/// <param name="source">The source reader.</param>
94-
/// <param name="bufferSize">The capacity of the resultant channel.</param>
9594
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
9695
/// <returns>A channel reader containing the joined results.</returns>
97-
public static ChannelReader<T> Join<T>(this ChannelReader<IAsyncEnumerable<T>> source, int bufferSize = 100, bool singleReader = false)
96+
public static ChannelReader<T> Join<T>(this ChannelReader<IAsyncEnumerable<T>> source, bool singleReader = false)
9897
{
99-
var buffer = CreateChannel<T>(bufferSize, singleReader);
98+
var buffer = CreateChannel<T>(1, singleReader);
10099

101-
Task.Run(async () =>
102-
await source.ReadAllAsync(
100+
source
101+
.ReadAllAsync(
103102
async (batch, i) =>
104103
{
105104
await foreach (var e in batch)
106105
await buffer.Writer.WriteAsync(e).ConfigureAwait(false);
107-
}))
106+
})
107+
.AsTask()
108108
.ContinueWith(
109109
t => buffer.CompleteAsync(t.Exception), TaskContinuationOptions.ExecuteSynchronously);
110110

Open.ChannelExtensions/Extensions.Transform.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ public override bool TryRead(out TOut item)
2929
return true;
3030
}
3131

32-
#pragma warning disable CS8653 // A default expression introduces a null value for a type parameter.
33-
item = default;
34-
#pragma warning restore CS8653 // A default expression introduces a null value for a type parameter.
32+
item = default!;
3533
return false;
3634
}
3735

Open.ChannelExtensions/Extensions.TypeFilter.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ public override bool TryRead(out T item)
3030
}
3131
}
3232

33-
#pragma warning disable CS8653 // A default expression introduces a null value for a type parameter.
34-
item = default;
35-
#pragma warning restore CS8653 // A default expression introduces a null value for a type parameter.
33+
item = default!;
3634
return false;
3735
}
3836

Open.ChannelExtensions/Extensions.WriteConcurrently.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,7 @@ static bool TryMoveNextSynchronized<T>(IEnumerator<T> source, out T value)
9191
}
9292
}
9393

94-
#pragma warning disable CS8653 // A default expression introduces a null value for a type parameter.
95-
value = default;
96-
#pragma warning restore CS8653 // A default expression introduces a null value for a type parameter.
94+
value = default!;
9795
return false;
9896
}
9997

Open.ChannelExtensions/Extensions._.cs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.IO;
4+
using System.Runtime.CompilerServices;
45
using System.Threading;
56
using System.Threading.Channels;
67
using System.Threading.Tasks;
@@ -190,6 +191,48 @@ public static ChannelReader<T> ToChannel<T>(this IAsyncEnumerable<T> source,
190191
bool deferredExecution = false)
191192
=> CreateChannel<T>(capacity, singleReader)
192193
.Source(source, cancellationToken, deferredExecution);
194+
195+
/// <summary>
196+
/// Iterates over the results in a ChannelReader.
197+
/// Provided as an alternative to .ReadAllAsync() which at the time of publishing this, only exists in .NET Core 3.0 and not .NET Standard 2.1
198+
/// </summary>
199+
/// <typeparam name="T">The output type of the channel.</typeparam>
200+
/// <param name="reader">The reader to read from.</param>
201+
/// <param name="cancellationToken">An optional cancellation token that will break out of the iteration.</param>
202+
/// <returns>An IAsyncEnumerable for iterating the channel.</returns>
203+
public static async IAsyncEnumerable<T> AsAsyncEnumerable<T>(this ChannelReader<T> reader, [EnumeratorCancellation] CancellationToken cancellationToken = default)
204+
{
205+
do
206+
{
207+
while (!cancellationToken.IsCancellationRequested && reader.TryRead(out var item))
208+
yield return item;
209+
}
210+
while (
211+
!cancellationToken.IsCancellationRequested
212+
&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
213+
}
214+
215+
/// <summary>
216+
/// Iterates over the results in a Channel.
217+
/// Provided as an alternative to .ReadAllAsync() which at the time of publishing this, only exists in .NET Core 3.0 and not .NET Standard 2.1
218+
/// </summary>
219+
/// <typeparam name="TIn">The type recieved by the source channel.</typeparam>
220+
/// <typeparam name="TOut">The outgoing type from the source channel.</typeparam>
221+
/// <param name="channel">The reader to read from.</param>
222+
/// <param name="cancellationToken">An optional cancellation token that will break out of the iteration.</param>
223+
/// <returns>An IAsyncEnumerable for iterating the channel.</returns>
224+
public static async IAsyncEnumerable<TOut> AsAsyncEnumerable<TIn, TOut>(this Channel<TIn, TOut> channel, [EnumeratorCancellation] CancellationToken cancellationToken = default)
225+
{
226+
var reader = channel.Reader;
227+
do
228+
{
229+
while (!cancellationToken.IsCancellationRequested && reader.TryRead(out var item))
230+
yield return item;
231+
}
232+
while (
233+
!cancellationToken.IsCancellationRequested
234+
&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
235+
}
193236
#endif
194237

195238
/// <summary>

0 commit comments

Comments
 (0)