Skip to content

Commit 54511c1

Browse files
committed
Better cancellationToken support
1 parent d5168cf commit 54511c1

File tree

9 files changed

+57
-48
lines changed

9 files changed

+57
-48
lines changed

src/Abstract/IBackgroundQueue.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ namespace Soenneker.Utils.BackgroundQueue.Abstract;
1010
/// </summary>
1111
public interface IBackgroundQueue
1212
{
13-
ValueTask QueueValueTask(Func<CancellationToken, ValueTask> workItem);
13+
ValueTask QueueValueTask(Func<CancellationToken, ValueTask> workItem, CancellationToken cancellationToken = default);
1414

15-
ValueTask QueueTask(Func<CancellationToken, Task> workItem);
15+
ValueTask QueueTask(Func<CancellationToken, Task> workItem, CancellationToken cancellationToken = default);
1616

17-
ValueTask<Func<CancellationToken, ValueTask>> DequeueValueTask(CancellationToken cancellationToken);
17+
ValueTask<Func<CancellationToken, ValueTask>> DequeueValueTask(CancellationToken cancellationToken = default);
1818

19-
ValueTask<Func<CancellationToken, Task>> DequeueTask(CancellationToken cancellationToken);
19+
ValueTask<Func<CancellationToken, Task>> DequeueTask(CancellationToken cancellationToken = default);
2020
}
Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Threading.Tasks;
1+
using System.Threading;
2+
using System.Threading.Tasks;
23

34
namespace Soenneker.Utils.BackgroundQueue.Abstract;
45

@@ -10,30 +11,30 @@ public interface IQueueInformationUtil
1011
/// <summary>
1112
/// Returns the currently processing lengths via thread safe (and potentially locked) local variables
1213
/// </summary>
13-
ValueTask<(int TaskLength, int ValueTaskLength)> GetCountsOfProcessing();
14+
ValueTask<(int TaskLength, int ValueTaskLength)> GetCountsOfProcessing(CancellationToken cancellationToken = default);
1415

1516
/// <summary>
1617
/// Returns the currently processing lengths via thread safe (and potentially locked) local variables
1718
/// </summary>
18-
ValueTask<bool> IsProcessing();
19+
ValueTask<bool> IsProcessing(CancellationToken cancellationToken = default);
1920

2021
/// <summary>
2122
/// Not to be called outside of <see cref="IBackgroundQueue"/> or <see cref="IQueuedHostedService"/>
2223
/// </summary>
23-
ValueTask<int> IncrementValueTaskCounter();
24+
ValueTask<int> IncrementValueTaskCounter(CancellationToken cancellationToken = default);
2425

2526
/// <summary>
2627
/// Not to be called outside of <see cref="IBackgroundQueue"/> or <see cref="IQueuedHostedService"/>
2728
/// </summary>
28-
ValueTask<int> DecrementValueTaskCounter();
29+
ValueTask<int> DecrementValueTaskCounter(CancellationToken cancellationToken = default);
2930

3031
/// <summary>
3132
/// Not to be called outside of <see cref="IBackgroundQueue"/> or <see cref="IQueuedHostedService"/>
3233
/// </summary>
33-
ValueTask<int> IncrementTaskCounter();
34+
ValueTask<int> IncrementTaskCounter(CancellationToken cancellationToken = default);
3435

3536
/// <summary>
3637
/// Not to be called outside of <see cref="IBackgroundQueue"/> or <see cref="IQueuedHostedService"/>
3738
/// </summary>
38-
ValueTask<int> DecrementTaskCounter();
39+
ValueTask<int> DecrementTaskCounter(CancellationToken cancellationToken = default);
3940
}

src/BackgroundQueue.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using Microsoft.Extensions.Logging;
77
using Soenneker.Extensions.Double;
88
using Soenneker.Extensions.MethodInfo;
9+
using Soenneker.Extensions.ValueTask;
910
using Soenneker.Utils.BackgroundQueue.Abstract;
1011

1112
namespace Soenneker.Utils.BackgroundQueue;
@@ -55,11 +56,11 @@ public BackgroundQueue(IConfiguration config, ILogger<BackgroundQueue> logger, I
5556
_taskChannel = Channel.CreateBounded<Func<CancellationToken, Task>>(options);
5657
}
5758

58-
public async ValueTask QueueValueTask(Func<CancellationToken, ValueTask> workItem)
59+
public async ValueTask QueueValueTask(Func<CancellationToken, ValueTask> workItem, CancellationToken cancellationToken = default)
5960
{
6061
// TODO: need to redo this, we're going to get too many warnings
6162

62-
int count = await _informationUtil.IncrementValueTaskCounter().ConfigureAwait(false);
63+
int count = await _informationUtil.IncrementValueTaskCounter(cancellationToken).ConfigureAwait(false);
6364

6465
if (count > _queueWarning)
6566
{
@@ -70,12 +71,12 @@ public async ValueTask QueueValueTask(Func<CancellationToken, ValueTask> workIte
7071
if (_log)
7172
_logger.LogDebug("Queuing ValueTask: {name}", workItem.ToString());
7273

73-
await _valueTaskChannel.Writer.WriteAsync(workItem).ConfigureAwait(false);
74+
await _valueTaskChannel.Writer.WriteAsync(workItem, cancellationToken).NoSync();
7475
}
7576

76-
public async ValueTask QueueTask(Func<CancellationToken, Task> workItem)
77+
public async ValueTask QueueTask(Func<CancellationToken, Task> workItem, CancellationToken cancellationToken = default)
7778
{
78-
int count = await _informationUtil.IncrementTaskCounter().ConfigureAwait(false);
79+
int count = await _informationUtil.IncrementTaskCounter(cancellationToken).NoSync();
7980

8081
if (count > _queueWarning)
8182
{
@@ -86,15 +87,15 @@ public async ValueTask QueueTask(Func<CancellationToken, Task> workItem)
8687
if (_log)
8788
_logger.LogDebug("Queuing Task: {name}", workItem.Method.GetSignature());
8889

89-
await _taskChannel.Writer.WriteAsync(workItem).ConfigureAwait(false);
90+
await _taskChannel.Writer.WriteAsync(workItem, cancellationToken).NoSync();
9091
}
9192

92-
public ValueTask<Func<CancellationToken, ValueTask>> DequeueValueTask(CancellationToken cancellationToken)
93+
public ValueTask<Func<CancellationToken, ValueTask>> DequeueValueTask(CancellationToken cancellationToken = default)
9394
{
9495
return _valueTaskChannel.Reader.ReadAsync(cancellationToken);
9596
}
9697

97-
public ValueTask<Func<CancellationToken, Task>> DequeueTask(CancellationToken cancellationToken)
98+
public ValueTask<Func<CancellationToken, Task>> DequeueTask(CancellationToken cancellationToken = default)
9899
{
99100
return _taskChannel.Reader.ReadAsync(cancellationToken);
100101
}

src/QueueInformationUtil.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,18 @@ public QueueInformationUtil(IConfiguration config)
2424
_asyncLock = new AsyncLock();
2525
}
2626

27-
public async ValueTask<(int TaskLength, int ValueTaskLength)> GetCountsOfProcessing()
27+
public async ValueTask<(int TaskLength, int ValueTaskLength)> GetCountsOfProcessing(CancellationToken cancellationToken = default)
2828
{
2929
if (!_lockCounts)
3030
return (_taskCount, _valueTaskCount);
3131

32-
using (await _asyncLock!.LockAsync().ConfigureAwait(false))
32+
using (await _asyncLock!.LockAsync(cancellationToken).ConfigureAwait(false))
3333
{
3434
return (_taskCount, _valueTaskCount);
3535
}
3636
}
3737

38-
public async ValueTask<bool> IsProcessing()
38+
public async ValueTask<bool> IsProcessing(CancellationToken cancellationToken = default)
3939
{
4040
if (!_lockCounts)
4141
{
@@ -45,7 +45,7 @@ public async ValueTask<bool> IsProcessing()
4545
return false;
4646
}
4747

48-
using (await _asyncLock!.LockAsync().ConfigureAwait(false))
48+
using (await _asyncLock!.LockAsync(cancellationToken).ConfigureAwait(false))
4949
{
5050
if (_valueTaskCount > 0 || _taskCount > 0)
5151
return true;
@@ -54,7 +54,7 @@ public async ValueTask<bool> IsProcessing()
5454
}
5555
}
5656

57-
public async ValueTask<int> IncrementValueTaskCounter()
57+
public async ValueTask<int> IncrementValueTaskCounter(CancellationToken cancellationToken = default)
5858
{
5959
if (!_lockCounts)
6060
{
@@ -63,15 +63,15 @@ public async ValueTask<int> IncrementValueTaskCounter()
6363
return _valueTaskCount;
6464
}
6565

66-
using (await _asyncLock!.LockAsync().ConfigureAwait(false))
66+
using (await _asyncLock!.LockAsync(cancellationToken).ConfigureAwait(false))
6767
{
6868
_valueTaskCount++;
6969

7070
return _valueTaskCount;
7171
}
7272
}
7373

74-
public async ValueTask<int> DecrementValueTaskCounter()
74+
public async ValueTask<int> DecrementValueTaskCounter(CancellationToken cancellationToken = default)
7575
{
7676
if (!_lockCounts)
7777
{
@@ -80,15 +80,15 @@ public async ValueTask<int> DecrementValueTaskCounter()
8080
return _valueTaskCount;
8181
}
8282

83-
using (await _asyncLock!.LockAsync().ConfigureAwait(false))
83+
using (await _asyncLock!.LockAsync(cancellationToken).ConfigureAwait(false))
8484
{
8585
_valueTaskCount--;
8686

8787
return _valueTaskCount;
8888
}
8989
}
9090

91-
public async ValueTask<int> IncrementTaskCounter()
91+
public async ValueTask<int> IncrementTaskCounter(CancellationToken cancellationToken = default)
9292
{
9393
if (!_lockCounts)
9494
{
@@ -97,23 +97,23 @@ public async ValueTask<int> IncrementTaskCounter()
9797
return _taskCount;
9898
}
9999

100-
using (await _asyncLock!.LockAsync().ConfigureAwait(false))
100+
using (await _asyncLock!.LockAsync(cancellationToken).ConfigureAwait(false))
101101
{
102102
_taskCount++;
103103

104104
return _taskCount;
105105
}
106106
}
107107

108-
public async ValueTask<int> DecrementTaskCounter()
108+
public async ValueTask<int> DecrementTaskCounter(CancellationToken cancellationToken = default)
109109
{
110110
if (!_lockCounts)
111111
{
112112
Interlocked.Decrement(ref _taskCount);
113113
return _taskCount;
114114
}
115115

116-
using (await _asyncLock!.LockAsync().ConfigureAwait(false))
116+
using (await _asyncLock!.LockAsync(cancellationToken).ConfigureAwait(false))
117117
{
118118
_taskCount--;
119119

src/QueuedHostedService.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
using Microsoft.Extensions.Hosting;
66
using Microsoft.Extensions.Logging;
77
using Soenneker.Extensions.MethodInfo;
8+
using Soenneker.Extensions.Task;
9+
using Soenneker.Extensions.ValueTask;
810
using Soenneker.Utils.BackgroundQueue.Abstract;
911

1012
namespace Soenneker.Utils.BackgroundQueue;
@@ -62,15 +64,15 @@ private async Task TaskProcessing(CancellationToken stoppingToken)
6264
{
6365
string? workItemName = null;
6466

65-
workItem = await _queue.DequeueTask(stoppingToken).ConfigureAwait(false);
67+
workItem = await _queue.DequeueTask(stoppingToken).NoSync();
6668

6769
if (_log)
6870
{
6971
workItemName = workItem.Method.GetSignature();
7072
_logger.LogDebug("~~ QueuedHostedService: Starting Task: {item}", workItemName);
7173
}
7274

73-
await workItem(stoppingToken).ConfigureAwait(false);
75+
await workItem(stoppingToken).NoSync();
7476

7577
if (_log)
7678
_logger.LogDebug("~~ QueuedHostedService: Completed Task: {item}", workItemName);
@@ -81,7 +83,7 @@ private async Task TaskProcessing(CancellationToken stoppingToken)
8183
}
8284
finally
8385
{
84-
await _queueInformationUtil.DecrementTaskCounter().ConfigureAwait(false);
86+
await _queueInformationUtil.DecrementTaskCounter(stoppingToken).NoSync();
8587
}
8688
}
8789
}
@@ -96,15 +98,15 @@ private async Task ValueTaskProcessing(CancellationToken stoppingToken)
9698
{
9799
string? workItemName = null;
98100

99-
workItem = await _queue.DequeueValueTask(stoppingToken).ConfigureAwait(false);
101+
workItem = await _queue.DequeueValueTask(stoppingToken).NoSync();
100102

101103
if (_log)
102104
{
103105
workItemName = workItem.Method.GetSignature();
104106
_logger.LogDebug("~~ QueuedHostedService: Starting ValueTask: {item}", workItemName);
105107
}
106108

107-
await workItem(stoppingToken).ConfigureAwait(false);
109+
await workItem(stoppingToken).NoSync();
108110

109111
if (_log)
110112
_logger.LogDebug("~~ QueuedHostedService: Completed ValueTask: {item}", workItemName);
@@ -115,7 +117,7 @@ private async Task ValueTaskProcessing(CancellationToken stoppingToken)
115117
}
116118
finally
117119
{
118-
await _queueInformationUtil.DecrementValueTaskCounter().ConfigureAwait(false);
120+
await _queueInformationUtil.DecrementValueTaskCounter(stoppingToken).NoSync();
119121
}
120122
}
121123
}

src/Soenneker.Utils.BackgroundQueue.csproj

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@
3636
<None Include="..\README.md" Pack="true" PackagePath="\" />
3737
<None Include="..\LICENSE" Pack="true" PackagePath="\" />
3838
<None Include="..\icon.png" Pack="true" PackagePath="\" />
39-
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
40-
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
39+
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.1" />
40+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2" />
4141
<PackageReference Include="Nito.AsyncEx" Version="5.1.2" />
42-
<PackageReference Include="Soenneker.Extensions.Configuration" Version="2.1.450" />
43-
<PackageReference Include="Soenneker.Extensions.Double" Version="2.1.332" />
42+
<PackageReference Include="Soenneker.Extensions.Configuration" Version="2.1.464" />
43+
<PackageReference Include="Soenneker.Extensions.Double" Version="2.1.348" />
4444
<PackageReference Include="Soenneker.Extensions.MethodInfo" Version="2.1.319" />
45+
<PackageReference Include="Soenneker.Extensions.ValueTask" Version="2.1.50" />
4546
</ItemGroup>
4647
</Project>

test/Soenneker.Utils.BackgroundQueue.Tests/Fixture.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
using Microsoft.Extensions.DependencyInjection;
44
using Microsoft.Extensions.DependencyInjection.Extensions;
55
using Microsoft.Extensions.Logging;
6+
using Soenneker.Extensions.Task;
7+
using Soenneker.Extensions.ValueTask;
68
using Soenneker.Utils.BackgroundQueue.Extensions;
79
using Soenneker.Utils.BackgroundQueue.Registrars;
810
using Soenneker.Utils.Test;
@@ -15,9 +17,9 @@ public override async Task InitializeAsync()
1517
{
1618
SetupIoC(Services);
1719

18-
await base.InitializeAsync().ConfigureAwait(false);
20+
await base.InitializeAsync().NoSync();
1921

20-
await ServiceProvider!.WarmupAndStartBackgroundQueue().ConfigureAwait(false);
22+
await ServiceProvider!.WarmupAndStartBackgroundQueue().NoSync();
2123
}
2224

2325
private static void SetupIoC(IServiceCollection services)
@@ -34,8 +36,8 @@ private static void SetupIoC(IServiceCollection services)
3436
public override async Task DisposeAsync()
3537
{
3638
if (ServiceProvider != null)
37-
await ServiceProvider.StopBackgroundQueue().ConfigureAwait(false);
39+
await ServiceProvider.StopBackgroundQueue().NoSync();
3840

39-
await base.DisposeAsync().ConfigureAwait(false);
41+
await base.DisposeAsync().NoSync();
4042
}
4143
}

test/Soenneker.Utils.BackgroundQueue.Tests/FixturedUnitTest.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Serilog.Extensions.Logging;
88
using Serilog.Sinks.XUnit.Injectable.Abstract;
99
using Soenneker.Extensions.ServiceProvider;
10+
using Soenneker.Extensions.ValueTask;
1011
using Soenneker.Tests.Logging;
1112
using Soenneker.Tests.Unit;
1213
using Soenneker.Utils.BackgroundQueue.Abstract;
@@ -77,7 +78,7 @@ public async ValueTask WaitOnQueueToEmpty()
7778

7879
do
7980
{
80-
isProcessing = await _queueInformationUtil.Value.IsProcessing().ConfigureAwait(false);
81+
isProcessing = await _queueInformationUtil.Value.IsProcessing().NoSync();
8182

8283
if (isProcessing)
8384
{
@@ -100,6 +101,6 @@ public async Task DisposeAsync()
100101
GC.SuppressFinalize(this);
101102

102103
if (Scope != null)
103-
await Scope.Value.DisposeAsync().ConfigureAwait(false);
104+
await Scope.Value.DisposeAsync().NoSync();
104105
}
105106
}

test/Soenneker.Utils.BackgroundQueue.Tests/UnitFixture.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Serilog.Sinks.XUnit.Injectable;
66
using Serilog.Sinks.XUnit.Injectable.Abstract;
77
using Serilog.Sinks.XUnit.Injectable.Extensions;
8+
using Soenneker.Extensions.ValueTask;
89
using Xunit;
910

1011
namespace Soenneker.Utils.BackgroundQueue.Tests;
@@ -49,6 +50,6 @@ public virtual async Task DisposeAsync()
4950
GC.SuppressFinalize(this);
5051

5152
if (ServiceProvider != null)
52-
await ServiceProvider.DisposeAsync().ConfigureAwait(false);
53+
await ServiceProvider.DisposeAsync().NoSync();
5354
}
5455
}

0 commit comments

Comments
 (0)