Skip to content

Commit 6613394

Browse files
committed
Added cancellation token multiplexer
1 parent 31364c1 commit 6613394

File tree

5 files changed

+284
-4
lines changed

5 files changed

+284
-4
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
namespace DotNext.Threading;
2+
3+
public class CancellationTokenMultiplexerTests : Test
4+
{
5+
[Fact]
6+
public static void CanceledImmediately()
7+
{
8+
var multiplexer = new CancellationTokenMultiplexer();
9+
using var scope = multiplexer.Combine([new(true), new(true)]);
10+
True(scope.Token.IsCancellationRequested);
11+
Equal(new(true), scope.CancellationOrigin);
12+
NotEqual(new(true), scope.Token);
13+
}
14+
15+
[Fact]
16+
public static async Task CanceledImmediatelyAsync()
17+
{
18+
var multiplexer = new CancellationTokenMultiplexer();
19+
await using var scope = multiplexer.Combine([new(true), new(true)]);
20+
True(scope.Token.IsCancellationRequested);
21+
Equal(new(true), scope.CancellationOrigin);
22+
NotEqual(new(true), scope.Token);
23+
}
24+
25+
[Fact]
26+
public static void CheckPooling()
27+
{
28+
CancellationToken token;
29+
var multiplexer = new CancellationTokenMultiplexer { MaximumRetained = int.MaxValue };
30+
using (var scope = multiplexer.Combine([new(false)]))
31+
{
32+
token = scope.Token;
33+
}
34+
35+
// rent again
36+
using (var scope = multiplexer.Combine([new(false)]))
37+
{
38+
Equal(token, scope.Token);
39+
}
40+
}
41+
}
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
using System.Diagnostics;
2+
using System.Runtime.InteropServices;
3+
4+
namespace DotNext.Threading;
5+
6+
/// <summary>
7+
/// Represents cancellation token multiplexer.
8+
/// </summary>
9+
/// <remarks>
10+
/// The multiplexer provides a pool of <see cref="CancellationTokenSource"/> to combine
11+
/// the cancellation tokens.
12+
/// </remarks>
13+
public sealed class CancellationTokenMultiplexer
14+
{
15+
private readonly int maximumRetained = int.MaxValue;
16+
private volatile int count;
17+
private volatile PooledCancellationTokenSource? firstInPool;
18+
19+
/// <summary>
20+
/// Gets or sets the maximum retained <see cref="CancellationTokenSource"/> instances.
21+
/// </summary>
22+
public int MaximumRetained
23+
{
24+
get => maximumRetained;
25+
init => maximumRetained = value > 0 ? value : throw new ArgumentOutOfRangeException(nameof(value));
26+
}
27+
28+
/// <summary>
29+
/// Combines the multiple tokens.
30+
/// </summary>
31+
/// <param name="tokens">The tokens to be combined.</param>
32+
/// <returns>The scope that contains a single multiplexed token.</returns>
33+
public Scope Combine(ReadOnlySpan<CancellationToken> tokens)
34+
=> new(this, tokens);
35+
36+
private void Return(PooledCancellationTokenSource source)
37+
{
38+
// try to increment the counter
39+
for (int current = count, tmp; current < maximumRetained; current = tmp)
40+
{
41+
tmp = Interlocked.CompareExchange(ref count, current + 1, current);
42+
if (tmp == current)
43+
{
44+
ReturnCore(source);
45+
break;
46+
}
47+
}
48+
}
49+
50+
private void ReturnCore(PooledCancellationTokenSource source)
51+
{
52+
for (PooledCancellationTokenSource? current = firstInPool, tmp;; current = tmp)
53+
{
54+
source.Next = current;
55+
tmp = Interlocked.CompareExchange(ref firstInPool, source, current);
56+
57+
if (ReferenceEquals(tmp, source))
58+
break;
59+
}
60+
}
61+
62+
private PooledCancellationTokenSource Rent()
63+
{
64+
var current = firstInPool;
65+
for (PooledCancellationTokenSource? tmp;; current = tmp)
66+
{
67+
if (current is null)
68+
{
69+
current = new();
70+
break;
71+
}
72+
73+
tmp = Interlocked.CompareExchange(ref firstInPool, current.Next, current);
74+
if (!ReferenceEquals(tmp, current))
75+
continue;
76+
77+
current.Next = null;
78+
var actualCount = Interlocked.Decrement(ref count);
79+
Debug.Assert(actualCount >= 0L);
80+
break;
81+
}
82+
83+
return current;
84+
}
85+
86+
/// <summary>
87+
/// Represents a
88+
/// </summary>
89+
[StructLayout(LayoutKind.Auto)]
90+
public readonly struct Scope : IMultiplexedCancellationTokenSource, IDisposable, IAsyncDisposable
91+
{
92+
private readonly CancellationTokenMultiplexer multiplexer;
93+
private readonly PooledCancellationTokenSource source;
94+
95+
internal Scope(CancellationTokenMultiplexer multiplexer, ReadOnlySpan<CancellationToken> tokens)
96+
{
97+
this.multiplexer = multiplexer;
98+
source = multiplexer.Rent();
99+
100+
foreach (var token in tokens)
101+
{
102+
source.Add(token);
103+
}
104+
}
105+
106+
/// <summary>
107+
/// Gets the cancellation token.
108+
/// </summary>
109+
public CancellationToken Token => source.Token;
110+
111+
/// <summary>
112+
/// Gets the cancellation origin if <see cref="Token"/> is in canceled state.
113+
/// </summary>
114+
public CancellationToken CancellationOrigin => source.CancellationOrigin;
115+
116+
/// <inheritdoc/>
117+
public void Dispose()
118+
{
119+
for (var i = 0; i < source.Count; i++)
120+
{
121+
source[i].Dispose();
122+
}
123+
124+
// now we sure that no one can cancel the source concurrently
125+
Return(multiplexer, source);
126+
}
127+
128+
/// <inheritdoc/>
129+
public ValueTask DisposeAsync() => ReturnAsync(multiplexer, source);
130+
131+
private static async ValueTask ReturnAsync(CancellationTokenMultiplexer multiplexer, PooledCancellationTokenSource source)
132+
{
133+
for (var i = 0; i < source.Count; i++)
134+
{
135+
await source[i].DisposeAsync().ConfigureAwait(false);
136+
}
137+
138+
Return(multiplexer, source);
139+
}
140+
141+
private static void Return(CancellationTokenMultiplexer multiplexer, PooledCancellationTokenSource source)
142+
{
143+
source.Clear();
144+
if (source.IsCancellationRequested)
145+
{
146+
source.Dispose();
147+
}
148+
else
149+
{
150+
multiplexer.Return(source);
151+
}
152+
}
153+
}
154+
155+
private sealed class PooledCancellationTokenSource : LinkedCancellationTokenSource
156+
{
157+
private const int Capacity = 3;
158+
private (CancellationTokenRegistration, CancellationTokenRegistration, CancellationTokenRegistration) inlineList;
159+
private List<CancellationTokenRegistration>? extraTokens;
160+
private int tokenCount;
161+
internal PooledCancellationTokenSource? Next;
162+
163+
public void Add(CancellationToken token)
164+
=> Add() = Attach(token);
165+
166+
private ref CancellationTokenRegistration Add()
167+
{
168+
Span<CancellationTokenRegistration> registrations;
169+
var index = tokenCount;
170+
if (++tokenCount < Capacity)
171+
{
172+
registrations = inlineList.AsSpan();
173+
}
174+
else
175+
{
176+
extraTokens ??= new();
177+
extraTokens.Add(default);
178+
registrations = CollectionsMarshal.AsSpan(extraTokens);
179+
index -= Capacity;
180+
}
181+
182+
return ref registrations[index];
183+
}
184+
185+
public int Count => tokenCount;
186+
187+
public ref CancellationTokenRegistration this[int index]
188+
{
189+
get
190+
{
191+
Span<CancellationTokenRegistration> registrations;
192+
if (index < Capacity)
193+
{
194+
registrations = inlineList.AsSpan();
195+
}
196+
else
197+
{
198+
registrations = CollectionsMarshal.AsSpan(extraTokens);
199+
index -= Capacity;
200+
}
201+
202+
return ref registrations[index];
203+
}
204+
}
205+
206+
public void Clear()
207+
{
208+
inlineList = default;
209+
extraTokens?.Clear();
210+
}
211+
212+
protected override void Dispose(bool disposing)
213+
{
214+
if (disposing)
215+
{
216+
extraTokens = null; // help GC
217+
}
218+
219+
base.Dispose(disposing);
220+
}
221+
}
222+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
namespace DotNext.Threading;
2+
3+
/// <summary>
4+
/// Represents multiplexed cancellation token source.
5+
/// </summary>
6+
public interface IMultiplexedCancellationTokenSource
7+
{
8+
/// <summary>
9+
/// Gets the multiplexed token.
10+
/// </summary>
11+
CancellationToken Token { get; }
12+
13+
/// <summary>
14+
/// Gets the cancellation origin.
15+
/// </summary>
16+
CancellationToken CancellationOrigin { get; }
17+
}

src/DotNext.Threading/Threading/LinkedCancellationTokenSource.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace DotNext.Threading;
1111
/// This source is not resettable. Calling of <see cref="CancellationTokenSource.TryReset"/>
1212
/// may lead to unpredictable results.
1313
/// </remarks>
14-
public abstract class LinkedCancellationTokenSource : CancellationTokenSource
14+
public abstract class LinkedCancellationTokenSource : CancellationTokenSource, IMultiplexedCancellationTokenSource
1515
{
1616
private Atomic.Boolean status;
1717

src/DotNext.Threading/Threading/LinkedTokenSourceFactory.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Buffers;
21
using DotNext.Buffers;
32
using Debug = System.Diagnostics.Debug;
43

@@ -129,8 +128,9 @@ public static class LinkedTokenSourceFactory
129128
/// <param name="e">The exception to analyze.</param>
130129
/// <param name="token">The token to check</param>
131130
/// <returns><see langword="true"/> indicates that the cancellation caused by <paramref name="source"/> and <see cref="LinkedCancellationTokenSource.CancellationOrigin"/> is <paramref name="token"/> ;or by <paramref name="token"/>.</returns>
132-
public static bool CausedBy(this OperationCanceledException e, LinkedCancellationTokenSource? source, CancellationToken token)
133-
=> source is null ? e.CancellationToken == token : (e.CancellationToken == source.Token && source.CancellationOrigin == token);
131+
public static bool CausedBy<TSource>(this OperationCanceledException e, TSource? source, CancellationToken token)
132+
where TSource : IMultiplexedCancellationTokenSource?
133+
=> source is null ? e.CancellationToken == token : e.CancellationToken == source.Token && source.CancellationOrigin == token;
134134

135135
private sealed class Linked2CancellationTokenSource : LinkedCancellationTokenSource
136136
{

0 commit comments

Comments
 (0)