Skip to content

Commit 3d2a1d8

Browse files
author
Oren (electricessence)
committed
Fixes for join waiting.
1 parent 8027fd2 commit 3d2a1d8

File tree

2 files changed

+15
-13
lines changed

2 files changed

+15
-13
lines changed

Open.ChannelExtensions.Tests/Program.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ static async Task Main()
3131
Console.WriteLine("Batch + join test 1...");
3232
var sw = Stopwatch.StartNew();
3333
var range = Enumerable
34-
.Range(0, 2000000);
34+
.Range(0, 10000000);
3535

36-
var result = new List<int>(2000000);
36+
var result = new List<int>(10000000);
3737

3838
var total = await range
3939
.ToChannel()
@@ -51,9 +51,9 @@ static async Task Main()
5151
Console.WriteLine("Batch + join test 2...");
5252
var sw = Stopwatch.StartNew();
5353
var range = Enumerable
54-
.Range(0, 2000000);
54+
.Range(0, 10000000);
5555

56-
var result = new List<int>(2000000);
56+
var result = new List<int>(10000000);
5757

5858
var total = await range
5959
.ToChannel()

Open.ChannelExtensions/Extensions.Join.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,25 @@ public JoiningChannelReader(ChannelReader<TList> source)
1717
{
1818
_source = source ?? throw new ArgumentNullException(nameof(source));
1919
Contract.EndContractBlock();
20-
}
2120

22-
private readonly ChannelReader<TList> _source;
23-
public override Task Completion
24-
=> _source.Completion.ContinueWith(t =>
21+
_source.Completion.ContinueWith(t =>
2522
{
2623
// Need to be sure writing is done before we continue...
2724
lock (_buffer)
2825
{
2926
_buffer.Writer.Complete(t.Exception);
3027
}
31-
return _buffer.Reader.Completion;
32-
})
33-
.Unwrap();
28+
});
29+
}
30+
31+
private readonly ChannelReader<TList> _source;
32+
public override Task Completion => _buffer.Reader.Completion;
3433

3534
bool TryPipeItems()
3635
{
36+
if (_source.Completion.IsCompleted)
37+
return false;
38+
3739
lock (_buffer)
3840
{
3941
if (!_source.TryRead(out TList batch))
@@ -72,8 +74,8 @@ public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationTo
7274
return b;
7375

7476
var s = _source.WaitToReadAsync(cancellationToken);
75-
if (s.IsCompletedSuccessfully && !s.Result)
76-
return b;
77+
if (s.IsCompletedSuccessfully)
78+
return s.Result ? new ValueTask<bool>(true) : b;
7779

7880
return WaitCore();
7981

0 commit comments

Comments
 (0)