Skip to content

Commit 9678c67

Browse files
author
Oren (electricessence)
committed
Ensured forced update is working.
1 parent 47372e7 commit 9678c67

File tree

4 files changed

+94
-38
lines changed

4 files changed

+94
-38
lines changed
Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Diagnostics.Contracts;
4+
using System.Threading;
45
using System.Threading.Channels;
6+
using System.Threading.Tasks;
57

68
namespace Open.ChannelExtensions
79
{
@@ -12,7 +14,7 @@ namespace Open.ChannelExtensions
1214
public class BatchingChannelReader<T> : BufferingChannelReader<T, List<T>>
1315
{
1416
private readonly int _batchSize;
15-
private List<T>? _current;
17+
private List<T>? _batch;
1618

1719
/// <summary>
1820
/// Constructs a BatchingChannelReader.
@@ -24,7 +26,6 @@ public BatchingChannelReader(ChannelReader<T> source, int batchSize, bool single
2426
Contract.EndContractBlock();
2527

2628
_batchSize = batchSize;
27-
_current = source.Completion.IsCompleted ? null : new List<T>(batchSize);
2829
}
2930

3031
/// <summary>
@@ -35,54 +36,53 @@ public bool ForceBatch()
3536
{
3637
if (Buffer == null || Buffer.Reader.Completion.IsCompleted) return false;
3738
if (TryPipeItems()) return true;
39+
if (_batch == null) return false;
3840

3941
lock (Buffer)
4042
{
4143
if (Buffer.Reader.Completion.IsCompleted) return false;
4244
if (TryPipeItems()) return true;
43-
var c = _current;
44-
if (c == null || c.Count == 0 || Buffer.Reader.Completion.IsCompleted)
45+
var c = _batch;
46+
if (c == null || Buffer.Reader.Completion.IsCompleted)
4547
return false;
4648
c.TrimExcess();
47-
_current = new List<T>(_batchSize);
48-
Buffer.Writer.TryWrite(c);
49+
_batch = null;
50+
return Buffer.Writer.TryWrite(c); // Should always be true at this point.
4951
}
50-
51-
return true;
5252
}
5353

5454
/// <inheritdoc />
5555
protected override bool TryPipeItems()
5656
{
57-
if (_current == null || Buffer == null || Buffer.Reader.Completion.IsCompleted)
57+
if (Buffer == null || Buffer.Reader.Completion.IsCompleted)
5858
return false;
5959

6060
lock (Buffer)
6161
{
62-
var c = _current;
63-
if (c == null || Buffer.Reader.Completion.IsCompleted)
64-
return false;
62+
if (Buffer.Reader.Completion.IsCompleted) return false;
6563

64+
var c = _batch;
6665
var source = Source;
6766
if (source == null || source.Completion.IsCompleted)
6867
{
6968
// All finished, release the last batch to the buffer.
69+
if (c == null) return false;
70+
7071
c.TrimExcess();
71-
_current = null;
72-
if (c.Count == 0)
73-
return false;
72+
_batch = null;
7473

7574
Buffer.Writer.TryWrite(c);
7675
return true;
7776
}
7877

7978
while (source.TryRead(out T item))
8079
{
81-
c.Add(item);
80+
if (c == null) _batch = c = new List<T>(_batchSize) { item };
81+
else c.Add(item);
8282

8383
if (c.Count == _batchSize)
8484
{
85-
_current = new List<T>(_batchSize);
85+
_batch = null;
8686
Buffer.Writer.TryWrite(c);
8787
return true;
8888
}
@@ -91,5 +91,39 @@ protected override bool TryPipeItems()
9191
return false;
9292
}
9393
}
94+
95+
/// <inheritdoc />
96+
protected override async ValueTask<bool> WaitToReadAsyncCore(ValueTask<bool> bufferWait, CancellationToken cancellationToken)
97+
{
98+
99+
var source = Source;
100+
if (source == null) return await bufferWait;
101+
102+
var b = bufferWait.AsTask();
103+
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
104+
var token = tokenSource.Token;
105+
106+
start:
107+
108+
if (b.IsCompleted) return await b.ConfigureAwait(false);
109+
110+
var s = source.WaitToReadAsync(token);
111+
if (s.IsCompleted && !b.IsCompleted) TryPipeItems();
112+
113+
if (b.IsCompleted)
114+
{
115+
tokenSource.Cancel();
116+
return await b.ConfigureAwait(false);
117+
}
118+
await Task.WhenAny(s.AsTask(), b).ConfigureAwait(false);
119+
if (b.IsCompleted)
120+
{
121+
tokenSource.Cancel();
122+
return await b.ConfigureAwait(false);
123+
}
124+
125+
TryPipeItems();
126+
goto start;
127+
}
94128
}
95129
}

Open.ChannelExtensions/BufferingChannelReader.cs

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Diagnostics;
23
using System.Diagnostics.Contracts;
34
using System.Threading;
45
using System.Threading.Channels;
@@ -35,10 +36,12 @@ protected BufferingChannelReader(ChannelReader<TIn> source, bool singleReader, b
3536
if (source.Completion.IsCompleted)
3637
{
3738
Buffer = null;
39+
_completion = Task.CompletedTask;
3840
}
3941
else
4042
{
4143
Buffer = Extensions.CreateChannel<TOut>(-1, singleReader, syncCont);
44+
_completion = Buffer.Reader.Completion;
4245

4346
source.Completion.ContinueWith(t =>
4447
{
@@ -54,8 +57,9 @@ protected BufferingChannelReader(ChannelReader<TIn> source, bool singleReader, b
5457
}
5558
}
5659

60+
private readonly Task _completion;
5761
/// <inheritdoc />
58-
public override Task Completion => Buffer?.Reader.Completion ?? Task.CompletedTask;
62+
public override Task Completion => _completion;
5963

6064
/// <summary>
6165
/// The method that triggers adding entries to the buffer.
@@ -87,33 +91,42 @@ public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationTo
8791
return new ValueTask<bool>(Task.FromCanceled<bool>(cancellationToken));
8892

8993
var b = Buffer.Reader.WaitToReadAsync(cancellationToken);
90-
if (b.IsCompleted)
91-
return b;
94+
return b.IsCompleted ? b : WaitToReadAsyncCore(b, cancellationToken);
95+
}
9296

97+
/// <summary>
98+
/// Implementation for waiting.
99+
/// Can be overridden.
100+
/// </summary>
101+
protected virtual async ValueTask<bool> WaitToReadAsyncCore(ValueTask<bool> bufferWait, CancellationToken cancellationToken)
102+
{
93103
var source = Source;
94-
if (source == null)
95-
return b;
96-
97-
return WaitCore();
104+
if (source == null) return await bufferWait;
98105

99-
async ValueTask<bool> WaitCore()
100-
{
106+
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
107+
var token = tokenSource.Token;
101108

102-
start:
109+
start:
103110

104-
if (b.IsCompleted) return await b;
111+
if (bufferWait.IsCompleted) return await bufferWait;
105112

106-
var s = source!.WaitToReadAsync(cancellationToken);
107-
if (s.IsCompleted && !b.IsCompleted)
108-
TryPipeItems();
113+
var s = source.WaitToReadAsync(token);
114+
if (s.IsCompleted && !bufferWait.IsCompleted) TryPipeItems();
109115

110-
if (b.IsCompleted) return await b;
111-
await s;
112-
if (b.IsCompleted) return await b;
113-
TryPipeItems();
114-
115-
goto start;
116+
if (bufferWait.IsCompleted)
117+
{
118+
tokenSource.Cancel();
119+
return await bufferWait.ConfigureAwait(false);
116120
}
121+
await s;
122+
if (bufferWait.IsCompleted)
123+
{
124+
tokenSource.Cancel();
125+
return await bufferWait.ConfigureAwait(false);
126+
}
127+
TryPipeItems();
128+
129+
goto start;
117130
}
118131
}
119132
}

Open.ChannelExtensions/Documentation.xml

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
<RepositoryType>git</RepositoryType>
1919
<PackageTags>channels dotnet threading tasks extensions</PackageTags>
2020
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
21-
<Version>3.3.2</Version>
21+
<Version>3.4.0</Version>
2222
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2323
<PackageReleaseNotes></PackageReleaseNotes>
2424
<RepositoryUrl>https://github.com/electricessence/Open.ChannelExtensions</RepositoryUrl>

0 commit comments

Comments
 (0)