Skip to content

Commit deb3ee0

Browse files
committed
Readme
1 parent 3dbc96f commit deb3ee0

File tree

4 files changed

+151
-19
lines changed

4 files changed

+151
-19
lines changed

README.md

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,111 @@
55
# ![](https://user-images.githubusercontent.com/4441470/224455560-91ed3ee7-f510-4041-a8d2-3fc093025112.png) Soenneker.Utils.BackgroundQueue
66
### A high-performance background Task/ValueTask queue
77

8+
## Overview
9+
`BackgroundQueue` provides an efficient way to manage background task execution in .NET applications. It helps prevent application overload by processing tasks in a controlled, asynchronous manner.
10+
11+
## Features
12+
- Supports both `ValueTask` and `Task` types.
13+
- Configurable queue size to limit resource usage.
14+
- Built-in tracking of running and pending tasks.
15+
- Extension methods for easy setup and management.
16+
- Includes a hosted service for automatic background processing.
17+
818
## Installation
919

10-
```
20+
```sh
1121
dotnet add package Soenneker.Utils.BackgroundQueue
1222
```
23+
24+
Register the `BackgroundQueue`:
25+
```csharp
26+
void ConfigureServices(IServiceCollection services)
27+
{
28+
services.AddBackgroundQueueAsSingleton();
29+
}
30+
```
31+
32+
### Starting
33+
```csharp
34+
await serviceProvider.WarmupAndStartBackgroundQueue(cancellationToken);
35+
```
36+
37+
For synchronous start:
38+
```csharp
39+
serviceProvider.WarmupAndStartBackgroundQueueSync(cancellationToken);
40+
```
41+
42+
### Stopping
43+
To stop the service:
44+
```csharp
45+
await serviceProvider.StopBackgroundQueue(cancellationToken);
46+
```
47+
48+
For synchronous stop:
49+
```csharp
50+
serviceProvider.StopBackgroundQueueSync(cancellationToken);
51+
```
52+
53+
## Configuration
54+
Configure the queue length and task tracking settings in your application:
55+
56+
```json
57+
{
58+
"Background": {
59+
"QueueLength": 5000,
60+
"LockCounts": false,
61+
"Log": false
62+
}
63+
}
64+
```
65+
66+
- `QueueLength`: Defines the maximum number of tasks in the queue.
67+
- `LockCounts`: Enables thread-safe counting of running tasks.
68+
- `Log`: Outputs task tracking information to `ILogger`
69+
70+
## Initializing the Queue
71+
To use `BackgroundQueue`, you probably want to inject it via your constructor:
72+
73+
```csharp
74+
IBackgroundQueue _queue;
75+
76+
void MyClass(IBackgroundQueue queue)
77+
{
78+
_queue = queue;
79+
}
80+
```
81+
82+
## Queueing Tasks
83+
84+
### Queuing a `ValueTask`
85+
Rather than wrapping the task, you can elide it directly to avoid an extra state machine:
86+
```csharp
87+
await _queue.QueueValueTask(_ => someValueTask(), cancellationToken);
88+
```
89+
90+
### Queuing a `Task`
91+
Similarly, for `Task`:
92+
```csharp
93+
await _queue.QueueTask(_ => someTask(), cancellationToken);
94+
```
95+
96+
## Waiting for Queue to Empty
97+
To ensure all queued tasks finish before proceeding:
98+
```csharp
99+
await queue.WaitUntilEmpty(cancellationToken);
100+
```
101+
102+
## Task Tracking
103+
The queue tracks:
104+
- The number of active `ValueTask` and `Task` instances.
105+
- Whether any tasks are still processing.
106+
107+
To check if tasks are running:
108+
```csharp
109+
bool isProcessing = await queueInformationUtil.IsProcessing(cancellationToken);
110+
```
111+
112+
To get current task counts:
113+
```csharp
114+
var (taskCount, valueTaskCount) = await queueInformationUtil.GetCountsOfProcessing(cancellationToken);
115+
```

src/Abstract/IBackgroundQueue.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,6 @@ public interface IBackgroundQueue
4141
/// <param name="cancellationToken">An optional token to cancel the operation.</param>
4242
/// <returns>A <see cref="ValueTask"/> containing a function that represents the dequeued work item, which accepts a <see cref="CancellationToken"/> and returns a <see cref="Task"/>.</returns>
4343
ValueTask<Func<CancellationToken, Task>> DequeueTask(CancellationToken cancellationToken = default);
44+
45+
ValueTask WaitUntilEmpty(CancellationToken cancellationToken = default);
4446
}

src/BackgroundQueue.cs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using Microsoft.Extensions.Logging;
77
using Soenneker.Extensions.Double;
88
using Soenneker.Extensions.MethodInfo;
9+
using Soenneker.Extensions.Task;
910
using Soenneker.Extensions.ValueTask;
1011
using Soenneker.Utils.BackgroundQueue.Abstract;
1112

@@ -21,14 +22,14 @@ public class BackgroundQueue : IBackgroundQueue
2122
private readonly int _queueWarning;
2223

2324
private readonly ILogger<BackgroundQueue> _logger;
24-
private readonly IQueueInformationUtil _informationUtil;
25+
private readonly IQueueInformationUtil _queueInformationUtil;
2526

2627
private readonly bool _log;
2728

28-
public BackgroundQueue(IConfiguration config, ILogger<BackgroundQueue> logger, IQueueInformationUtil informationUtil)
29+
public BackgroundQueue(IConfiguration config, ILogger<BackgroundQueue> logger, IQueueInformationUtil queueInformationUtil)
2930
{
3031
_logger = logger;
31-
_informationUtil = informationUtil;
32+
_queueInformationUtil = queueInformationUtil;
3233

3334
var configQueueLength = config.GetValue<int>("Background:QueueLength");
3435
_log = config.GetValue<bool>("Background:Log");
@@ -60,7 +61,7 @@ public async ValueTask QueueValueTask(Func<CancellationToken, ValueTask> workIte
6061
{
6162
// TODO: need to redo this, we're going to get too many warnings
6263

63-
int count = await _informationUtil.IncrementValueTaskCounter(cancellationToken).NoSync();
64+
int count = await _queueInformationUtil.IncrementValueTaskCounter(cancellationToken).NoSync();
6465

6566
if (count > _queueWarning)
6667
{
@@ -76,7 +77,7 @@ public async ValueTask QueueValueTask(Func<CancellationToken, ValueTask> workIte
7677

7778
public async ValueTask QueueTask(Func<CancellationToken, Task> workItem, CancellationToken cancellationToken = default)
7879
{
79-
int count = await _informationUtil.IncrementTaskCounter(cancellationToken).NoSync();
80+
int count = await _queueInformationUtil.IncrementTaskCounter(cancellationToken).NoSync();
8081

8182
if (count > _queueWarning)
8283
{
@@ -99,4 +100,30 @@ public ValueTask<Func<CancellationToken, Task>> DequeueTask(CancellationToken ca
99100
{
100101
return _taskChannel.Reader.ReadAsync(cancellationToken);
101102
}
103+
104+
public async ValueTask WaitUntilEmpty(CancellationToken cancellationToken = default)
105+
{
106+
const int delayMs = 500;
107+
108+
bool isProcessing;
109+
110+
do
111+
{
112+
isProcessing = await _queueInformationUtil.IsProcessing(cancellationToken).ConfigureAwait(false);
113+
114+
if (isProcessing)
115+
{
116+
if (_log)
117+
{
118+
_logger.LogDebug("Delaying for {ms}ms (Background queue emptying...)...", delayMs);
119+
}
120+
121+
await Task.Delay(delayMs, cancellationToken).NoSync();
122+
}
123+
else
124+
{
125+
_logger.LogDebug("Background queue is empty; continuing");
126+
}
127+
} while (isProcessing);
128+
}
102129
}

src/QueuedHostedService.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,36 +43,36 @@ public override Task StartAsync(CancellationToken cancellationToken)
4343
/// <summary>
4444
/// Triggered when the application host is ready to start the service.
4545
/// </summary>
46-
protected override Task ExecuteAsync(CancellationToken stoppingToken)
46+
protected override Task ExecuteAsync(CancellationToken cancellationToken)
4747
{
4848
if (_log)
4949
_logger.LogDebug("~~ QueuedHostedService: Executing...");
5050

51-
Task valueTaskProcessing = ValueTaskProcessing(stoppingToken);
52-
Task taskProcessing = TaskProcessing(stoppingToken);
51+
Task valueTaskProcessing = ValueTaskProcessing(cancellationToken);
52+
Task taskProcessing = TaskProcessing(cancellationToken);
5353

5454
return Task.WhenAll(valueTaskProcessing, taskProcessing);
5555
}
5656

57-
private async Task TaskProcessing(CancellationToken stoppingToken)
57+
private async Task TaskProcessing(CancellationToken cancellationToken)
5858
{
59-
while (!stoppingToken.IsCancellationRequested)
59+
while (!cancellationToken.IsCancellationRequested)
6060
{
6161
Func<CancellationToken, Task>? workItem = null;
6262

6363
try
6464
{
6565
string? workItemName = null;
6666

67-
workItem = await _queue.DequeueTask(stoppingToken).NoSync();
67+
workItem = await _queue.DequeueTask(cancellationToken).NoSync();
6868

6969
if (_log)
7070
{
7171
workItemName = workItem.Method.GetSignature();
7272
_logger.LogDebug("~~ QueuedHostedService: Starting Task: {item}", workItemName);
7373
}
7474

75-
await workItem(stoppingToken).NoSync();
75+
await workItem(cancellationToken).NoSync();
7676

7777
if (_log)
7878
_logger.LogDebug("~~ QueuedHostedService: Completed Task: {item}", workItemName);
@@ -83,30 +83,30 @@ private async Task TaskProcessing(CancellationToken stoppingToken)
8383
}
8484
finally
8585
{
86-
await _queueInformationUtil.DecrementTaskCounter(stoppingToken).NoSync();
86+
await _queueInformationUtil.DecrementTaskCounter(cancellationToken).NoSync();
8787
}
8888
}
8989
}
9090

91-
private async Task ValueTaskProcessing(CancellationToken stoppingToken)
91+
private async Task ValueTaskProcessing(CancellationToken cancellationToken)
9292
{
93-
while (!stoppingToken.IsCancellationRequested)
93+
while (!cancellationToken.IsCancellationRequested)
9494
{
9595
Func<CancellationToken, ValueTask>? workItem = null;
9696

9797
try
9898
{
9999
string? workItemName = null;
100100

101-
workItem = await _queue.DequeueValueTask(stoppingToken).NoSync();
101+
workItem = await _queue.DequeueValueTask(cancellationToken).NoSync();
102102

103103
if (_log)
104104
{
105105
workItemName = workItem.Method.GetSignature();
106106
_logger.LogDebug("~~ QueuedHostedService: Starting ValueTask: {item}", workItemName);
107107
}
108108

109-
await workItem(stoppingToken).NoSync();
109+
await workItem(cancellationToken).NoSync();
110110

111111
if (_log)
112112
_logger.LogDebug("~~ QueuedHostedService: Completed ValueTask: {item}", workItemName);
@@ -117,7 +117,7 @@ private async Task ValueTaskProcessing(CancellationToken stoppingToken)
117117
}
118118
finally
119119
{
120-
await _queueInformationUtil.DecrementValueTaskCounter(stoppingToken).NoSync();
120+
await _queueInformationUtil.DecrementValueTaskCounter(cancellationToken).NoSync();
121121
}
122122
}
123123
}

0 commit comments

Comments
 (0)