Skip to content

Commit 5fcf4e5

Browse files
committed
Migration to multiplexed cancellation token
1 parent 6be3ae8 commit 5fcf4e5

File tree

10 files changed

+269
-246
lines changed

10 files changed

+269
-246
lines changed

src/DotNext.Threading/Runtime/Caching/RandomAccessCache.cs

Lines changed: 45 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public partial class RandomAccessCache<TKey, TValue> : Disposable, IAsyncDisposa
2525
private readonly CancellationToken lifetimeToken;
2626
private readonly IEqualityComparer<TKey>? keyComparer;
2727
private readonly bool growable;
28+
private readonly CancellationTokenMultiplexer cancellationTokens;
2829
private Task evictionTask;
2930

3031
[SuppressMessage("Usage", "CA2213", Justification = "False positive.")]
@@ -50,6 +51,7 @@ private protected RandomAccessCache(int dictionarySize, int collisionThreshold,
5051
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(collisionThreshold, thresholdName);
5152

5253
dictionarySize = PrimeNumber.GetPrime(dictionarySize);
54+
cancellationTokens = new() { MaximumRetained = 100 };
5355
maxCacheCapacity = (growable = collisionThreshold is not int.MaxValue)
5456
? collisionThreshold
5557
: dictionarySize;
@@ -89,19 +91,19 @@ private async ValueTask<ReadWriteSession> ChangeAsync(TKey key, Timeout timeout,
8991
{
9092
var hashCode = keyComparer?.GetHashCode(key) ?? EqualityComparer<TKey>.Default.GetHashCode(key);
9193

92-
var cts = token.LinkTo(lifetimeToken);
94+
var cts = cancellationTokens.Combine([token, lifetimeToken]);
9395
var bucketLock = default(AsyncExclusiveLock);
9496
try
9597
{
96-
for (BucketList bucketsCopy;; await GrowAsync(bucketsCopy, timeout, token).ConfigureAwait(false))
98+
for (BucketList bucketsCopy;; await GrowAsync(bucketsCopy, timeout, cts.Token).ConfigureAwait(false))
9799
{
98100
Bucket.Ref bucket;
99101

100102
bucketsCopy = buckets;
101103
for (BucketList newCopy;; bucketsCopy = newCopy)
102104
{
103105
bucketsCopy.GetByHash(hashCode, out bucket);
104-
await bucket.Value.Lock.AcquireAsync(timeout.GetRemainingTimeOrZero(), token).ConfigureAwait(false);
106+
await bucket.Value.Lock.AcquireAsync(timeout.GetRemainingTimeOrZero(), cts.Token).ConfigureAwait(false);
105107
bucketLock = bucket.Value.Lock;
106108

107109
newCopy = buckets;
@@ -123,20 +125,18 @@ private async ValueTask<ReadWriteSession> ChangeAsync(TKey key, Timeout timeout,
123125
bucketLockCopy.Release();
124126
}
125127
}
126-
catch (OperationCanceledException e) when (e.CancellationToken == cts?.Token)
128+
catch (OperationCanceledException e) when (e.CausedBy(cts, lifetimeToken))
127129
{
128-
throw cts.CancellationOrigin == lifetimeToken
129-
? CreateException()
130-
: new OperationCanceledException(cts.CancellationOrigin);
130+
throw CreateException();
131131
}
132-
catch (OperationCanceledException e) when (e.CancellationToken == lifetimeToken)
132+
catch (OperationCanceledException e) when (e.CancellationToken == cts.Token)
133133
{
134-
throw CreateException();
134+
throw new OperationCanceledException(e.Message, e, cts.CancellationOrigin);
135135
}
136136
finally
137137
{
138-
cts?.Dispose();
139138
bucketLock?.Release();
139+
await cts.DisposeAsync().ConfigureAwait(false);
140140
}
141141
}
142142

@@ -177,19 +177,19 @@ private async ValueTask<ReadWriteSession> ReplaceAsync(TKey key, Timeout timeout
177177
{
178178
var hashCode = keyComparer?.GetHashCode(key) ?? EqualityComparer<TKey>.Default.GetHashCode(key);
179179

180-
var cts = token.LinkTo(lifetimeToken);
180+
var cts = cancellationTokens.Combine([token, lifetimeToken]);
181181
var bucketLock = default(AsyncExclusiveLock);
182182
try
183183
{
184-
for (BucketList bucketsCopy;; await GrowAsync(bucketsCopy, timeout, token).ConfigureAwait(false))
184+
for (BucketList bucketsCopy;; await GrowAsync(bucketsCopy, timeout, cts.Token).ConfigureAwait(false))
185185
{
186186
Bucket.Ref bucket;
187187

188188
bucketsCopy = buckets;
189189
for (BucketList newCopy;; bucketsCopy = newCopy)
190190
{
191191
bucketsCopy.GetByHash(hashCode, out bucket);
192-
await bucket.Value.Lock.AcquireAsync(timeout.GetRemainingTimeOrZero(), token).ConfigureAwait(false);
192+
await bucket.Value.Lock.AcquireAsync(timeout.GetRemainingTimeOrZero(), cts.Token).ConfigureAwait(false);
193193
bucketLock = bucket.Value.Lock;
194194

195195
newCopy = buckets;
@@ -206,27 +206,25 @@ private async ValueTask<ReadWriteSession> ReplaceAsync(TKey key, Timeout timeout
206206
{
207207
if (bucket.Value.TryRemove(keyComparer, key, hashCode) is { } removedPair && removedPair.ReleaseCounter() is false)
208208
OnRemoved(removedPair);
209-
209+
210210
return new(this, in bucket, bucketLockCopy, key, hashCode);
211211
}
212212

213213
bucketLockCopy.Release();
214214
}
215215
}
216-
catch (OperationCanceledException e) when (e.CancellationToken == cts?.Token)
216+
catch (OperationCanceledException e) when (e.CausedBy(cts, lifetimeToken))
217217
{
218-
throw cts.CancellationOrigin == lifetimeToken
219-
? CreateException()
220-
: new OperationCanceledException(cts.CancellationOrigin);
218+
throw CreateException();
221219
}
222-
catch (OperationCanceledException e) when (e.CancellationToken == lifetimeToken)
220+
catch (OperationCanceledException e) when (e.CancellationToken == cts.Token)
223221
{
224-
throw CreateException();
222+
throw new OperationCanceledException(e.Message, e, cts.CancellationOrigin);
225223
}
226224
finally
227225
{
228-
cts?.Dispose();
229226
bucketLock?.Release();
227+
await cts.DisposeAsync().ConfigureAwait(false);
230228
}
231229
}
232230

@@ -299,21 +297,21 @@ public bool Contains(TKey key)
299297
{
300298
var hashCode = keyComparer?.GetHashCode(key) ?? EqualityComparer<TKey>.Default.GetHashCode(key);
301299

302-
var cts = token.LinkTo(lifetimeToken);
300+
var cts = cancellationTokens.Combine([token, lifetimeToken]);
303301
var bucketLock = default(AsyncExclusiveLock);
304302
try
305303
{
306304
Bucket.Ref bucket;
307305
for (BucketList bucketsCopy = buckets, newCopy;; bucketsCopy = newCopy)
308306
{
309307
bucketsCopy.GetByHash(hashCode, out bucket);
310-
await bucket.Value.Lock.AcquireAsync(timeout, token).ConfigureAwait(false);
308+
await bucket.Value.Lock.AcquireAsync(timeout, cts.Token).ConfigureAwait(false);
311309
bucketLock = bucket.Value.Lock;
312310

313311
newCopy = buckets;
314312
if (ReferenceEquals(newCopy, bucketsCopy))
315313
break;
316-
314+
317315
bucketLock.Release();
318316
bucketLock = null;
319317
}
@@ -322,19 +320,18 @@ public bool Contains(TKey key)
322320
? new ReadSession(this, removedPair)
323321
: null;
324322
}
325-
catch (OperationCanceledException e) when (e.CancellationToken == cts?.Token)
323+
catch (OperationCanceledException e) when (e.CausedBy(cts, lifetimeToken))
326324
{
327-
throw cts.CancellationOrigin == lifetimeToken
328-
? CreateException()
329-
: new OperationCanceledException(cts.CancellationOrigin);
325+
throw CreateException();
330326
}
331-
catch (OperationCanceledException e) when (e.CancellationToken == lifetimeToken)
327+
catch (OperationCanceledException e) when (e.CancellationToken == cts.Token)
332328
{
333-
throw CreateException();
329+
throw new OperationCanceledException(e.Message, e, cts.CancellationOrigin);
334330
}
335331
finally
336332
{
337333
bucketLock?.Release();
334+
await cts.DisposeAsync().ConfigureAwait(false);
338335
}
339336
}
340337

@@ -374,7 +371,7 @@ private async ValueTask<bool> InvalidateAsync(TKey key, TimeSpan timeout, Cancel
374371
{
375372
var hashCode = keyComparer?.GetHashCode(key) ?? EqualityComparer<TKey>.Default.GetHashCode(key);
376373

377-
var cts = token.LinkTo(lifetimeToken);
374+
var cts = cancellationTokens.Combine([token, lifetimeToken]);
378375
var bucketLock = default(AsyncExclusiveLock);
379376
KeyValuePair? removedPair;
380377
try
@@ -383,32 +380,31 @@ private async ValueTask<bool> InvalidateAsync(TKey key, TimeSpan timeout, Cancel
383380
for (BucketList bucketsCopy = buckets, newCopy;; bucketsCopy = newCopy)
384381
{
385382
bucketsCopy.GetByHash(hashCode, out bucket);
386-
await bucket.Value.Lock.AcquireAsync(timeout, token).ConfigureAwait(false);
383+
await bucket.Value.Lock.AcquireAsync(timeout, cts.Token).ConfigureAwait(false);
387384
bucketLock = bucket.Value.Lock;
388385

389386
newCopy = buckets;
390387
if (ReferenceEquals(newCopy, bucketsCopy))
391388
break;
392-
389+
393390
bucketLock.Release();
394391
bucketLock = null;
395392
}
396-
393+
397394
removedPair = bucket.Value.TryRemove(keyComparer, key, hashCode);
398395
}
399-
catch (OperationCanceledException e) when (e.CancellationToken == cts?.Token)
396+
catch (OperationCanceledException e) when (e.CausedBy(cts, lifetimeToken))
400397
{
401-
throw cts.CancellationOrigin == lifetimeToken
402-
? CreateException()
403-
: new OperationCanceledException(cts.CancellationOrigin);
398+
throw CreateException();
404399
}
405-
catch (OperationCanceledException e) when (e.CancellationToken == lifetimeToken)
400+
catch (OperationCanceledException e) when (e.CancellationToken == cts.Token)
406401
{
407-
throw CreateException();
402+
throw new OperationCanceledException(e.Message, e, cts.CancellationOrigin);
408403
}
409404
finally
410405
{
411406
bucketLock?.Release();
407+
await cts.DisposeAsync().ConfigureAwait(false);
412408
}
413409

414410
if (removedPair is null)
@@ -456,18 +452,18 @@ public bool Invalidate(TKey key, TimeSpan timeout, CancellationToken token = def
456452
/// <exception cref="ObjectDisposedException">The cache is disposed.</exception>
457453
public async ValueTask InvalidateAsync(CancellationToken token = default)
458454
{
459-
var cts = token.LinkTo(lifetimeToken);
455+
var cts = cancellationTokens.Combine([token, lifetimeToken]);
460456
var bucketsCopy = buckets;
461457
var lockCount = 0;
462458
try
463459
{
464460
AsyncExclusiveLock bucketLock;
465-
461+
466462
// take first lock
467463
for (BucketList newCopy;; bucketsCopy = newCopy)
468464
{
469465
bucketLock = bucketsCopy.GetByIndex(0).Lock;
470-
await bucketLock.AcquireAsync(token).ConfigureAwait(false);
466+
await bucketLock.AcquireAsync(cts.Token).ConfigureAwait(false);
471467

472468
newCopy = buckets;
473469
if (ReferenceEquals(newCopy, bucketsCopy))
@@ -480,26 +476,24 @@ public async ValueTask InvalidateAsync(CancellationToken token = default)
480476
for (lockCount = 1; lockCount < bucketsCopy.Count; lockCount++)
481477
{
482478
bucketLock = bucketsCopy.GetByIndex(lockCount).Lock;
483-
await bucketLock.AcquireAsync(token).ConfigureAwait(false);
479+
await bucketLock.AcquireAsync(cts.Token).ConfigureAwait(false);
484480
}
485481

486482
// invalidate all buckets
487483
bucketsCopy.Invalidate(OnRemoved);
488484
}
489-
catch (OperationCanceledException e) when (e.CancellationToken == cts?.Token)
485+
catch (OperationCanceledException e) when (e.CausedBy(cts, lifetimeToken))
490486
{
491-
throw cts.CancellationOrigin == lifetimeToken
492-
? CreateException()
493-
: new OperationCanceledException(cts.CancellationOrigin);
487+
throw CreateException();
494488
}
495-
catch (OperationCanceledException e) when (e.CancellationToken == lifetimeToken)
489+
catch (OperationCanceledException e) when (e.CancellationToken == cts.Token)
496490
{
497-
throw CreateException();
491+
throw new OperationCanceledException(e.Message, e, cts.CancellationOrigin);
498492
}
499493
finally
500494
{
501-
cts?.Dispose();
502495
bucketsCopy.Release(lockCount);
496+
await cts.DisposeAsync().ConfigureAwait(false);
503497
}
504498
}
505499

src/DotNext.Threading/Threading/Channels/PersistentChannelReader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public override bool TryRead([MaybeNullWhen(false)] out T item)
107107
}
108108

109109
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
110-
public override async ValueTask<T> ReadAsync(CancellationToken token)
110+
public override async ValueTask<T> ReadAsync(CancellationToken token = default)
111111
{
112112
var task = await Task.WhenAny(reader.WaitToReadAsync(token), reader.Completion).ConfigureAwait(false);
113113

0 commit comments

Comments
 (0)