Skip to content

Commit 31364c1

Browse files
committed
Reducd lock contention
1 parent 68039c9 commit 31364c1

File tree

6 files changed

+82
-67
lines changed

6 files changed

+82
-67
lines changed

src/DotNext.Threading/Threading/AsyncExchanger.cs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,7 @@ internal void Initialize(AsyncExchanger<T> owner, T value)
2929
this.value = value;
3030
}
3131

32-
protected override void AfterConsumed()
33-
{
34-
if (owner is { } ownerCopy && TryReset(out _))
35-
ownerCopy.OnCompleted(this);
36-
}
32+
protected override void AfterConsumed() => owner?.OnCompleted(this);
3733

3834
protected override void CleanUp()
3935
{
@@ -80,7 +76,7 @@ private ExchangePoint RentExchangePoint(T value)
8076
{
8177
Debug.Assert(Monitor.IsEntered(SyncRoot));
8278

83-
var result = pool.Get<ExchangePoint>();
79+
var result = pool.Rent<ExchangePoint>();
8480
result.Initialize(this, value);
8581
return result;
8682
}
@@ -89,10 +85,16 @@ private void OnCompleted(ExchangePoint point)
8985
{
9086
Monitor.Enter(SyncRoot);
9187
if (ReferenceEquals(this.point, point))
88+
{
9289
this.point = null;
90+
}
9391

94-
pool.Return(point);
9592
Monitor.Exit(SyncRoot);
93+
94+
if (point.TryReset(out _))
95+
{
96+
pool.Return(point);
97+
}
9698
}
9799

98100
/// <summary>

src/DotNext.Threading/Threading/QueuedSynchronizer.Queue.cs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,34 +43,36 @@ private protected WaitQueueVisitor GetWaitQueue(ref LinkedValueTaskCompletionSou
4343
[MethodImpl(MethodImplOptions.NoInlining)]
4444
private void ReturnNode(WaitNode node)
4545
{
46-
var syncRoot = SyncRoot;
4746
if (node.NeedsRemoval)
4847
{
49-
var suspendedCallers = default(LinkedValueTaskCompletionSource<bool>);
50-
Monitor.Enter(syncRoot);
51-
try
52-
{
53-
suspendedCallers = waitQueue.Remove(node) && node.DrainOnReturn
54-
? DrainWaitQueue()
55-
: null;
56-
}
57-
finally
58-
{
59-
Monitor.Exit(syncRoot);
60-
suspendedCallers?.Unwind();
61-
}
48+
RemoveAndDrainIfNeeded(node);
6249
}
6350

6451
// the node is removed for sure, it can be returned back to the pool
6552
if (node.TryReset(out _))
6653
{
67-
lock (syncRoot)
68-
{
69-
pool.Return(node);
70-
}
54+
pool.Return(node);
7155
}
7256
}
73-
57+
58+
private void RemoveAndDrainIfNeeded(WaitNode node)
59+
{
60+
var syncRoot = SyncRoot;
61+
var suspendedCallers = default(LinkedValueTaskCompletionSource<bool>);
62+
Monitor.Enter(syncRoot);
63+
try
64+
{
65+
suspendedCallers = waitQueue.Remove(node) && node.DrainOnReturn
66+
? DrainWaitQueue()
67+
: null;
68+
}
69+
finally
70+
{
71+
Monitor.Exit(syncRoot);
72+
suspendedCallers?.Unwind();
73+
}
74+
}
75+
7476
private protected TNode? Acquire<T, TBuilder, TNode>(ref TBuilder builder, bool acquired)
7577
where T : struct, IEquatable<T>
7678
where TNode : WaitNode, new()
@@ -97,7 +99,7 @@ private void ReturnNode(WaitNode node)
9799
}
98100
else
99101
{
100-
var node = pool.Get<TNode>();
102+
var node = pool.Rent<TNode>();
101103
node.Initialize(this, CaptureCallerInformation(), TBuilder.ThrowOnTimeout);
102104
waitQueue.Add(node);
103105
builder.Complete(node);

src/DotNext.Threading/Threading/Tasks/LinkedValueTaskCompletionSource.cs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ private protected LinkedValueTaskCompletionSource(bool runContinuationsAsynchron
1010
{
1111
}
1212

13-
internal LinkedValueTaskCompletionSource<T>? Next { get; private set; }
13+
internal LinkedValueTaskCompletionSource<T>? Next { get; set; }
1414

1515
internal LinkedValueTaskCompletionSource<T>? Previous { get; private set; }
1616

@@ -22,14 +22,6 @@ internal void Append(LinkedValueTaskCompletionSource<T> node)
2222
Next = node;
2323
}
2424

25-
internal void Prepend(LinkedValueTaskCompletionSource<T> node)
26-
{
27-
Debug.Assert(Previous is null);
28-
29-
node.Next = this;
30-
Previous = node;
31-
}
32-
3325
internal void Detach()
3426
{
3527
if (Previous is not null)

src/DotNext.Threading/Threading/Tasks/Pooling/ValueTaskPool.cs

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,13 @@ namespace DotNext.Threading.Tasks.Pooling;
77
/*
88
* Represents a pool without any allocations. Assuming that wait queues are organized using
99
* linked nodes where each node is a completion source. If so, the node pointers (next/previous)
10-
* can be used to keep completion sources in the pool. Moreover, access to the pool is synchronized
11-
* by the caller (see QueuedSynchronizer). Thus, it doesn't require explicit synchronization such as
12-
* Monitor lock.
10+
* can be used to keep completion sources in the pool. The implementation is thread-safe.
1311
*/
1412
[StructLayout(LayoutKind.Auto)]
1513
internal struct ValueTaskPool<T>(long maximumRetained)
1614
{
17-
private LinkedValueTaskCompletionSource<T>? first;
18-
private long count;
15+
private volatile LinkedValueTaskCompletionSource<T>? first;
16+
private long count; // volatile
1917

2018
public ValueTaskPool()
2119
: this(long.MaxValue)
@@ -26,41 +24,57 @@ public ValueTaskPool()
2624

2725
public void Return(LinkedValueTaskCompletionSource<T> node)
2826
{
29-
Debug.Assert(node is not null);
30-
Debug.Assert(count is 0L || first is not null);
31-
Debug.Assert(node.Status is ManualResetCompletionSourceStatus.WaitForActivation);
27+
Debug.Assert(node is { Status: ManualResetCompletionSourceStatus.WaitForActivation });
28+
Debug.Assert(node is { Next: null, Previous: null });
3229

33-
if (count < maximumRetained)
30+
// try to increment the counter
31+
for (long current = Atomic.Read(in count), tmp; current < maximumRetained; current = tmp)
3432
{
35-
first?.Prepend(node);
36-
first = node;
37-
count++;
38-
Debug.Assert(count > 0L);
33+
tmp = Interlocked.CompareExchange(ref count, current + 1L, current);
34+
if (tmp == current)
35+
{
36+
ReturnCore(node);
37+
break;
38+
}
3939
}
4040
}
4141

42-
public TNode Get<TNode>()
43-
where TNode : LinkedValueTaskCompletionSource<T>, new()
42+
private void ReturnCore(LinkedValueTaskCompletionSource<T> node)
4443
{
45-
TNode result;
46-
if (first is null)
44+
for (LinkedValueTaskCompletionSource<T>? current = first, tmp;; current = tmp)
4745
{
48-
Debug.Assert(count == 0L);
46+
node.Next = current;
4947

50-
result = new();
48+
tmp = Interlocked.CompareExchange(ref first, node, current);
49+
if (ReferenceEquals(tmp, current))
50+
break;
5151
}
52-
else
52+
}
53+
54+
public TNode Rent<TNode>()
55+
where TNode : LinkedValueTaskCompletionSource<T>, new()
56+
{
57+
var current = first;
58+
for (LinkedValueTaskCompletionSource<T>? tmp;; current = Unsafe.As<TNode>(tmp))
5359
{
54-
Debug.Assert(first is TNode);
55-
result = Unsafe.As<TNode>(first);
56-
first = result.Next;
57-
result.Detach();
58-
count--;
60+
if (current is null)
61+
{
62+
current = new TNode();
63+
break;
64+
}
65+
66+
tmp = Interlocked.CompareExchange(ref first, current.Next, current);
67+
if (!ReferenceEquals(tmp, current))
68+
continue;
5969

60-
Debug.Assert(count >= 0L);
61-
Debug.Assert(count is 0L || first is not null);
70+
current.Next = null;
71+
var actualCount = Interlocked.Decrement(ref count);
72+
Debug.Assert(actualCount >= 0L);
73+
break;
6274
}
6375

64-
return result;
76+
Debug.Assert(current is TNode);
77+
Debug.Assert(current is { Next: null, Previous: null });
78+
return Unsafe.As<TNode>(current);
6579
}
6680
}

src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.Signal.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ private LinkedValueTaskCompletionSource<bool> EnqueueNode()
4545
{
4646
Debug.Assert(Monitor.IsEntered(SyncRoot));
4747

48-
var result = pool.Get<Signal>();
48+
var result = pool.Rent<Signal>();
4949
result.Initialize(this);
5050
waitQueue.Add(result);
5151
return result;

src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,16 @@ public bool IsCompletionTaskSupported
4646

4747
private void OnCompleted(Signal signal)
4848
{
49-
lock (SyncRoot)
49+
if (signal.NeedsRemoval)
5050
{
51-
if (signal.NeedsRemoval)
51+
lock (SyncRoot)
52+
{
5253
waitQueue.Remove(signal);
54+
}
55+
}
5356

57+
if (signal.TryReset(out _))
58+
{
5459
pool.Return(signal);
5560
}
5661
}

0 commit comments

Comments
 (0)