Skip to content

Commit 91a741e

Browse files
author
Oren (electricessence)
committed
Cleanup.
1 parent 64668e5 commit 91a741e

File tree

3 files changed

+16
-39
lines changed

3 files changed

+16
-39
lines changed

Open.ChannelExtensions/BufferingChannelReader.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace Open.ChannelExtensions
88
{
99
abstract class BufferingChannelReader<TIn, TOut> : ChannelReader<TOut>
1010
{
11-
protected readonly ChannelReader<TIn> Source;
11+
protected ChannelReader<TIn>? Source;
1212
protected readonly Channel<TOut> Buffer;
1313
public BufferingChannelReader(ChannelReader<TIn> source, bool singleReader)
1414
{
@@ -17,20 +17,22 @@ public BufferingChannelReader(ChannelReader<TIn> source, bool singleReader)
1717

1818
Buffer = Extensions.CreateChannel<TOut>(-1, singleReader);
1919

20-
if (Source.Completion.IsCompleted)
20+
if (source.Completion.IsCompleted)
2121
{
22-
Buffer.Writer.Complete(Source.Completion.Exception);
22+
Buffer.Writer.Complete(source.Completion.Exception);
2323
}
2424
else
2525
{
26-
Source.Completion.ContinueWith(t =>
26+
source.Completion.ContinueWith(t =>
2727
{
2828
// Need to be sure writing is done before we continue...
2929
lock (Buffer)
3030
{
3131
while (TryPipeItems()) { }
3232
Buffer.Writer.Complete(t.Exception);
3333
}
34+
35+
Source = null;
3436
});
3537
}
3638
}
@@ -59,11 +61,12 @@ public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationTo
5961
if (cancellationToken.IsCancellationRequested)
6062
return new ValueTask<bool>(Task.FromCanceled<bool>(cancellationToken));
6163

64+
var source = Source;
6265
var b = Buffer.Reader.WaitToReadAsync(cancellationToken);
63-
if (b.IsCompletedSuccessfully || Source.Completion.IsCompleted)
66+
if (b.IsCompletedSuccessfully || source==null || source.Completion.IsCompleted)
6467
return b;
6568

66-
var s = Source.WaitToReadAsync(cancellationToken);
69+
var s = source.WaitToReadAsync(cancellationToken);
6770
if (s.IsCompletedSuccessfully)
6871
return s.Result ? new ValueTask<bool>(true) : b;
6972

Open.ChannelExtensions/Extensions.Batch.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ protected override bool TryPipeItems()
3939
if (Buffer.Reader.Completion.IsCompleted)
4040
return false;
4141

42-
if (Source.Completion.IsCompleted)
42+
var source = Source;
43+
if (source == null || source.Completion.IsCompleted)
4344
{
4445
c.TrimExcess();
4546
_current = null;
@@ -50,7 +51,7 @@ protected override bool TryPipeItems()
5051
return true;
5152
}
5253

53-
while (Source.TryRead(out T item))
54+
while (source.TryRead(out T item))
5455
{
5556
if (c.Count == _batchSize)
5657
{

Open.ChannelExtensions/Extensions.Join.cs

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ public JoiningChannelReader(ChannelReader<TList> source, bool singleReader) : ba
1717

1818
protected override bool TryPipeItems()
1919
{
20-
if (Source.Completion.IsCompleted)
20+
var source = Source;
21+
if (source==null || source.Completion.IsCompleted)
2122
return false;
2223

2324
lock (Buffer)
2425
{
25-
if (!Source.TryRead(out TList batch))
26+
if (!source.TryRead(out TList batch))
2627
return false;
2728

2829
foreach (var i in batch)
@@ -36,34 +37,6 @@ protected override bool TryPipeItems()
3637
}
3738
}
3839

39-
const int DefaultBufferSize = 100;
40-
41-
static ChannelReader<T> JoinInternal<T, TEnumerable>(ChannelReader<TEnumerable> source, int bufferSize, bool singleReader)
42-
where TEnumerable : IEnumerable<T>
43-
{
44-
if (source == null) throw new ArgumentNullException(nameof(source));
45-
if (bufferSize < 1) throw new ArgumentOutOfRangeException(nameof(bufferSize), bufferSize, "Must be at least 1.");
46-
Contract.EndContractBlock();
47-
48-
var buffer = CreateChannel<T>(bufferSize, singleReader);
49-
var writer = buffer.Writer;
50-
51-
Task.Run(async () =>
52-
await source.ReadAllAsync(
53-
async (batch, i) =>
54-
{
55-
foreach (var e in batch)
56-
{
57-
if(!writer.TryWrite(e))
58-
await writer.WriteAsync(e).ConfigureAwait(false);
59-
}
60-
}))
61-
.ContinueWith(
62-
t => buffer.CompleteAsync(t.Exception), TaskContinuationOptions.ExecuteSynchronously);
63-
64-
return buffer.Reader;
65-
}
66-
6740
/// <summary>
6841
/// Joins collections of the same type into a single channel reader in the order provided.
6942
/// </summary>
@@ -123,7 +96,7 @@ public static ChannelReader<T> Join<T>(this ChannelReader<T[]> source, bool sing
12396
/// <param name="bufferSize">The capacity of the resultant channel.</param>
12497
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
12598
/// <returns>A channel reader containing the joined results.</returns>
126-
public static ChannelReader<T> Join<T>(this ChannelReader<IAsyncEnumerable<T>> source, int bufferSize = DefaultBufferSize, bool singleReader = false)
99+
public static ChannelReader<T> Join<T>(this ChannelReader<IAsyncEnumerable<T>> source, int bufferSize = 100, bool singleReader = false)
127100
{
128101
var buffer = CreateChannel<T>(bufferSize, singleReader);
129102

0 commit comments

Comments
 (0)