Skip to content

Commit 8a3a921

Browse files
author
Meyn
committed
Introduced DynamicPriorityChannel to manage dynamic priority handling.
Implemented RequestContainer interface into RequestHandler to add request logic. Changed RequestPriority to a floating type for enhanced flexibility.
1 parent a6c2dbb commit 8a3a921

12 files changed

+2183
-103
lines changed

Requests/Channel/ConccurentQuene.cs

Lines changed: 877 additions & 0 deletions
Large diffs are not rendered by default.

Requests/Channel/ConcurrentPriorityQueue.cs

Lines changed: 498 additions & 0 deletions
Large diffs are not rendered by default.

Requests/Channel/DynamicPriorityChannel.cs

Lines changed: 376 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 79 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System.Collections.Concurrent;
2-
using System.Diagnostics.CodeAnalysis;
1+
using System.Diagnostics.CodeAnalysis;
32
using System.Threading.Channels;
43

54
namespace Requests.Channel
@@ -9,12 +8,12 @@ namespace Requests.Channel
98
/// </summary>
109
/// <param name="Priority">Priority of the item</param>
1110
/// <param name="Item">Item that the channel uses</param>
12-
public record PriorityItem<TElement>(int Priority, TElement Item);
11+
public record PriorityItem<TElement>(float Priority, TElement Item);
1312
/// <summary>
1413
/// A implementation of channel with a priority listing
1514
/// </summary>
1615
/// <typeparam name="TElement"></typeparam>
17-
public class PriorityChannel<TElement> : Channel<PriorityItem<TElement>>
16+
public class FixedPriorityChannel<TElement> : Channel<PriorityItem<TElement>>, IPriorityChannel<TElement>
1817
{
1918
/// <summary>Task that indicates the channel has completed.</summary>
2019
private readonly TaskCompletionSource _completion = new(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -23,41 +22,60 @@ public class PriorityChannel<TElement> : Channel<PriorityItem<TElement>>
2322
/// <summary>Readers blocked reading from the channel.</summary>
2423
private readonly Deque<AsyncOperation<PriorityItem<TElement>>> _blockedReaders = new();
2524

26-
/// <summary>Readers waiting for a notification that data is available.</summary>
25+
/// <summary>
26+
/// The tail of the linked list of readers waiting for data availability notifications.
27+
/// </summary>
2728
private AsyncOperation<bool>? _waitingReadersTail;
28-
/// <summary>Set to non-null once Complete has been called.</summary>
29+
30+
/// <summary>
31+
/// Indicates whether the writing process has completed. Non-null if writing is done.
32+
/// </summary>
2933
private Exception? _doneWriting;
3034

31-
// The number of queues we store internally.
35+
/// <summary>
36+
/// The number of priority levels used internally for managing data queues.
37+
/// </summary>
3238
private readonly int _priorityCount = 0;
33-
private int m_count = 0;
3439

35-
internal ParallelChannelOptions Options { get; } = new();
40+
/// <summary>
41+
/// The current count of items in the data structure.
42+
/// </summary>
43+
private int _count = 0;
44+
45+
/// <summary>
46+
/// Gets the current count of items in the data structure.
47+
/// </summary>
48+
public int Count => _count;
49+
50+
/// <summary>
51+
/// Gets the options for configuring the behavior of the parallel channel, such as maximum degree of parallelism.
52+
/// </summary>
53+
public ParallelChannelOptions Options { get; } = new();
3654

3755
/// <summary>
3856
/// Initialize the priority channel.
3957
/// </summary>
4058
/// <param name="priCount">How many prioritys the channel sould handle</param>
41-
internal PriorityChannel(int priCount)
59+
internal FixedPriorityChannel(int priCount)
4260
{
4361
_priorityCount = priCount;
4462
_queues = new ConcurrentQueue<PriorityItem<TElement>>[_priorityCount];
4563
for (int i = 0; i < _priorityCount; i++)
4664
_queues[i] = new ConcurrentQueue<PriorityItem<TElement>>();
4765

48-
Reader = new PriorityChannelReader(this);
49-
Writer = new PriorityChannelWriter(this);
66+
Reader = new FixedPriorityChannelReader(this);
67+
Writer = new FixedPriorityChannelWriter(this);
5068

5169
}
5270

5371
/// <summary>
54-
/// Executes a parallel for-each operation on this instance of <see cref="PriorityChannel{TElement}"/>,
72+
/// Executes a parallel for-each operation on this instance of <see cref="FixedPriorityChannel{TElement}"/>,
5573
/// enforcing a dynamic maximum degree of parallelism.
5674
/// </summary>
5775
/// <param name="body">Excecution function of parallel reader</param>
5876
/// <returns>A Task</returns>
5977
/// <exception cref="ArgumentNullException"></exception>
60-
internal Task RunParallelReader(Func<PriorityItem<TElement>, CancellationToken, ValueTask> body)
78+
public Task RunParallelReader(Func<PriorityItem<TElement>, CancellationToken, ValueTask> body)
6179
{
6280
_ = body ?? throw new ArgumentNullException(nameof(body));
6381

@@ -104,15 +122,21 @@ private async IAsyncEnumerable<PriorityItem<TElement>> GetThrottledSource(Semaph
104122
}
105123
}
106124

125+
/// <summary>
126+
/// Attempts to remove the specified item from the priority channel.
127+
/// </summary>
128+
/// <param name="item">The item to remove.</param>
129+
/// <returns>True if the item was successfully removed; otherwise, false.</returns>
130+
public bool TryRemove(PriorityItem<TElement> item) => ((FixedPriorityChannelWriter)Writer).TryRemove(item);
107131

108132

109-
private sealed class PriorityChannelReader : ChannelReader<PriorityItem<TElement>>
133+
private sealed class FixedPriorityChannelReader : ChannelReader<PriorityItem<TElement>>
110134
{
111-
internal readonly PriorityChannel<TElement> _parent;
135+
internal readonly FixedPriorityChannel<TElement> _parent;
112136
private readonly AsyncOperation<PriorityItem<TElement>> _readerSingleton;
113137
private readonly AsyncOperation<bool> _waiterSingleton;
114138

115-
internal PriorityChannelReader(PriorityChannel<TElement> parent)
139+
internal FixedPriorityChannelReader(FixedPriorityChannel<TElement> parent)
116140
{
117141
_parent = parent;
118142
_readerSingleton = new AsyncOperation<PriorityItem<TElement>>(true, pooled: true);
@@ -125,33 +149,33 @@ internal PriorityChannelReader(PriorityChannel<TElement> parent)
125149

126150
public override bool CanPeek => true;
127151

128-
public override int Count => _parent.m_count;
152+
public override int Count => _parent._count;
129153

130154
public override ValueTask<PriorityItem<TElement>> ReadAsync(CancellationToken cancellationToken)
131155
{
132156
if (cancellationToken.IsCancellationRequested)
133157
return new ValueTask<PriorityItem<TElement>>(Task.FromCanceled<PriorityItem<TElement>>(cancellationToken));
134158

135-
PriorityChannel<TElement> parent = _parent;
159+
FixedPriorityChannel<TElement> parent = _parent;
136160
for (int i = 0; i < parent._priorityCount; i++)
137161
if (parent._queues[i].TryDequeue(out PriorityItem<TElement>? item))
138162
{
139-
Interlocked.Decrement(ref parent.m_count);
163+
Interlocked.Decrement(ref parent._count);
140164
CompleteIfDone(parent);
141165
return new ValueTask<PriorityItem<TElement>>(item);
142166
}
143167

144168
return LockReadAsync(parent, cancellationToken);
145169
}
146170

147-
private ValueTask<PriorityItem<TElement>> LockReadAsync(PriorityChannel<TElement> parent, CancellationToken cancellationToken)
171+
private ValueTask<PriorityItem<TElement>> LockReadAsync(FixedPriorityChannel<TElement> parent, CancellationToken cancellationToken)
148172
{
149173
lock (parent.SyncObj)
150174
{
151175
for (int i = 0; i < parent._priorityCount; i++)
152176
if (parent._queues[i].TryDequeue(out PriorityItem<TElement>? item))
153177
{
154-
Interlocked.Decrement(ref parent.m_count);
178+
Interlocked.Decrement(ref parent._count);
155179
CompleteIfDone(parent);
156180
return new ValueTask<PriorityItem<TElement>>(item);
157181
}
@@ -182,12 +206,12 @@ private ValueTask<PriorityItem<TElement>> LockReadAsync(PriorityChannel<TElement
182206
/// <returns>A bool that indicates success</returns>
183207
public override bool TryRead([MaybeNullWhen(false)] out PriorityItem<TElement> item)
184208
{
185-
PriorityChannel<TElement> parent = _parent;
209+
FixedPriorityChannel<TElement> parent = _parent;
186210

187211
for (int i = 0; i < parent._priorityCount; i++)
188212
if (parent._queues[i].TryDequeue(out item))
189213
{
190-
Interlocked.Decrement(ref parent.m_count);
214+
Interlocked.Decrement(ref parent._count);
191215
CompleteIfDone(parent);
192216
return true;
193217
}
@@ -198,15 +222,15 @@ public override bool TryRead([MaybeNullWhen(false)] out PriorityItem<TElement> i
198222

199223
public override bool TryPeek([MaybeNullWhen(false)] out PriorityItem<TElement> item)
200224
{
201-
PriorityChannel<TElement> parent = _parent;
225+
FixedPriorityChannel<TElement> parent = _parent;
202226
for (int i = 0; i < _parent._priorityCount; i++)
203227
if (parent._queues[i].TryPeek(out item))
204228
return true;
205229
item = null;
206230
return false;
207231
}
208232

209-
private static void CompleteIfDone(PriorityChannel<TElement> parent)
233+
private static void CompleteIfDone(FixedPriorityChannel<TElement> parent)
210234
{
211235
if (parent._doneWriting != null && parent._queues.All(x => x.IsEmpty))
212236
ChannelUtilities.Complete(parent._completion, parent._doneWriting);
@@ -223,7 +247,7 @@ public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationTo
223247
return new ValueTask<bool>(true);
224248

225249

226-
PriorityChannel<TElement> parent = _parent;
250+
FixedPriorityChannel<TElement> parent = _parent;
227251

228252
lock (parent.SyncObj)
229253
{
@@ -253,14 +277,14 @@ public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationTo
253277

254278
}
255279

256-
private sealed class PriorityChannelWriter : ChannelWriter<PriorityItem<TElement>>
280+
private sealed class FixedPriorityChannelWriter : ChannelWriter<PriorityItem<TElement>>
257281
{
258-
internal readonly PriorityChannel<TElement> _parent;
259-
internal PriorityChannelWriter(PriorityChannel<TElement> parent) => _parent = parent;
282+
internal readonly FixedPriorityChannel<TElement> _parent;
283+
internal FixedPriorityChannelWriter(FixedPriorityChannel<TElement> parent) => _parent = parent;
260284

261285
public override bool TryComplete(Exception? error)
262286
{
263-
PriorityChannel<TElement> parent = _parent;
287+
FixedPriorityChannel<TElement> parent = _parent;
264288
bool completeTask;
265289

266290
lock (parent.SyncObj)
@@ -276,7 +300,7 @@ public override bool TryComplete(Exception? error)
276300
return true;
277301
}
278302

279-
private static void CompleatChannelUtils(Exception? error, PriorityChannel<TElement> parent, bool completeTask)
303+
private static void CompleatChannelUtils(Exception? error, FixedPriorityChannel<TElement> parent, bool completeTask)
280304
{
281305
if (completeTask)
282306
ChannelUtilities.Complete(parent._completion, error);
@@ -287,17 +311,17 @@ private static void CompleatChannelUtils(Exception? error, PriorityChannel<TElem
287311

288312
public override bool TryWrite(PriorityItem<TElement> pair)
289313
{
290-
PriorityChannel<TElement> parent = _parent;
314+
FixedPriorityChannel<TElement> parent = _parent;
291315
while (true)
292316
{
293-
if (PriorityChannel<TElement>.PriorityChannelWriter.TryWriteLock(parent, pair, out AsyncOperation<PriorityItem<TElement>>? blockedReader, out AsyncOperation<bool>? waitingReadersTail) is bool result)
317+
if (FixedPriorityChannel<TElement>.FixedPriorityChannelWriter.TryWriteLock(parent, pair, out AsyncOperation<PriorityItem<TElement>>? blockedReader, out AsyncOperation<bool>? waitingReadersTail) is bool result)
294318
return result;
295319

296320
if (blockedReader != null)
297321
{
298322
if (blockedReader.TrySetResult(pair))
299323
{
300-
Interlocked.Increment(ref parent.m_count);
324+
Interlocked.Increment(ref parent._count);
301325
return true;
302326
}
303327
}
@@ -309,7 +333,7 @@ public override bool TryWrite(PriorityItem<TElement> pair)
309333
}
310334
}
311335

312-
private static bool? TryWriteLock(PriorityChannel<TElement> parent, PriorityItem<TElement> pair, out AsyncOperation<PriorityItem<TElement>>? blockedReader, out AsyncOperation<bool>? waitingReadersTail)
336+
private static bool? TryWriteLock(FixedPriorityChannel<TElement> parent, PriorityItem<TElement> pair, out AsyncOperation<PriorityItem<TElement>>? blockedReader, out AsyncOperation<bool>? waitingReadersTail)
313337
{
314338
waitingReadersTail = null;
315339
blockedReader = null;
@@ -321,8 +345,8 @@ public override bool TryWrite(PriorityItem<TElement> pair)
321345

322346
if (parent._blockedReaders.IsEmpty)
323347
{
324-
parent._queues[pair.Priority].Enqueue(pair);
325-
Interlocked.Increment(ref parent.m_count);
348+
parent._queues[(int)pair.Priority].Enqueue(pair);
349+
Interlocked.Increment(ref parent._count);
326350
waitingReadersTail = parent._waitingReadersTail;
327351
if (waitingReadersTail == null)
328352
return true;
@@ -350,6 +374,23 @@ public override ValueTask WriteAsync(PriorityItem<TElement> item, CancellationTo
350374
cancellationToken.IsCancellationRequested ? new ValueTask(Task.FromCanceled(cancellationToken)) :
351375
TryWrite(item) ? default :
352376
new ValueTask(Task.FromException(ChannelUtilities.CreateInvalidCompletionException(_parent._doneWriting)));
377+
378+
/// <summary>
379+
/// Attempts to remove the specified item from the priority channel.
380+
/// </summary>
381+
/// <param name="item">The item to remove.</param>
382+
/// <returns>True if the item was successfully removed; otherwise, false.</returns>
383+
public bool TryRemove(PriorityItem<TElement> item)
384+
{
385+
FixedPriorityChannel<TElement> parent = _parent;
386+
lock (parent.SyncObj)
387+
{
388+
if (parent._doneWriting != null)
389+
return false;
390+
391+
return parent._queues[(int)item.Priority].TryRemove(item);
392+
}
393+
}
353394
}
354395

355396
/// <summary>
@@ -374,8 +415,6 @@ public PriorityItem<TElement>[] ToArray()
374415
}
375416
}
376417

377-
378-
379418
/// <summary>Gets the object used to synchronize access to all state on this instance.</summary>
380419
private object SyncObj => _queues;
381420
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
using System.Threading.Channels;
2+
3+
namespace Requests.Channel
4+
{
5+
/// <summary>
6+
/// Represents a priority channel that allows reading and writing of items with associated priorities.
7+
/// </summary>
8+
/// <typeparam name="TElement">The type of elements in the channel.</typeparam>
9+
public interface IPriorityChannel<TElement>
10+
{
11+
/// <summary>
12+
/// Gets the reader for the priority channel, which allows reading items with their associated priorities.
13+
/// </summary>
14+
ChannelReader<PriorityItem<TElement>> Reader { get; }
15+
16+
/// <summary>
17+
/// Gets the writer for the priority channel, which allows writing items with their associated priorities.
18+
/// </summary>
19+
ChannelWriter<PriorityItem<TElement>> Writer { get; }
20+
21+
/// <summary>
22+
/// Gets the options for configuring the behavior of the parallel channel, such as maximum degree of parallelism.
23+
/// </summary>
24+
ParallelChannelOptions Options { get; }
25+
26+
/// <summary>
27+
/// Gets the current count of items in the data structure.
28+
/// </summary>
29+
public int Count { get; }
30+
31+
/// <summary>
32+
/// Executes a parallel reader operation on the priority channel, processing items with their associated priorities.
33+
/// </summary>
34+
/// <param name="body">The function to execute for each item in the channel.</param>
35+
/// <returns>A task that represents the completion of the parallel reader operation.</returns>
36+
Task RunParallelReader(Func<PriorityItem<TElement>, CancellationToken, ValueTask> body);
37+
38+
/// <summary>
39+
/// Converts the items in the priority channel to an array, preserving their order and priorities.
40+
/// </summary>
41+
/// <returns>An array of items with their associated priorities.</returns>
42+
PriorityItem<TElement>[] ToArray();
43+
44+
/// <summary>
45+
/// Attempts to remove the specified item from the priority channel.
46+
/// </summary>
47+
/// <param name="item">The item to remove.</param>
48+
/// <returns>True if the item was successfully removed; otherwise, false.</returns>
49+
bool TryRemove(PriorityItem<TElement> item);
50+
}
51+
}

Requests/IRequestContainer.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
using Requests.Options;
2+
3+
namespace Requests
4+
{
5+
/// <summary>
6+
/// Represents a container that combines multiple <see cref="IRequest"/> instances.
7+
/// </summary>
8+
/// <typeparam name="TRequest">A class that implements <see cref="IRequest"/>.</typeparam>
9+
public interface IRequestContainer<TRequest> : IEnumerable<TRequest>, IRequest where TRequest : IRequest
10+
{
11+
12+
/// <summary>
13+
/// Gets the count of <see cref="IRequest"/> instances contained in the <see cref="IRequestContainer{TRequest}"/>.
14+
/// </summary>
15+
int Count { get; }
16+
17+
/// <summary>
18+
/// Incorporates a <see cref="IRequest"/> into the <see cref="IRequestContainer{TRequest}"/>.
19+
/// </summary>
20+
/// <param name="request">The <see cref="IRequest"/> to be incorporated.</param>
21+
void Add(TRequest request);
22+
23+
/// <summary>
24+
/// Incorporates multiple <see cref="IRequest"/> into the <see cref="IRequestContainer{TRequest}"/>.
25+
/// </summary>
26+
/// <param name="requests">The <see cref="IRequest"/> to be incorporated.</param>
27+
void AddRange(params TRequest[] requests);
28+
29+
/// <summary>
30+
/// Removes a specific <see cref="IRequest"/> from this container.
31+
/// </summary>
32+
/// <param name="requests">The request to be removed.</param>
33+
void Remove(params TRequest[] requests);
34+
35+
/// <summary>
36+
/// Sets the priority for the <see cref="IRequestContainer{TRequest}"/>.
37+
/// Not to the contained <see cref="IRequest"/> objects.
38+
/// </summary>
39+
void SetPriority(RequestPriority priority);
40+
}
41+
}

0 commit comments

Comments
 (0)