Skip to content

Commit a4a8767

Browse files
committed
Use linked list to track the dispatchers
1 parent 5136ed3 commit a4a8767

File tree

2 files changed

+80
-18
lines changed

2 files changed

+80
-18
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
using System.Diagnostics.CodeAnalysis;
2+
3+
namespace DotNext.Net.Multiplexing;
4+
5+
partial class MultiplexedListener
6+
{
7+
private sealed class TaskNode
8+
{
9+
private readonly WeakReference<Task> task;
10+
private TaskNode? next;
11+
12+
private TaskNode(Task task)
13+
=> this.task = new(task);
14+
15+
private TaskNode? CleanupAndGetNext()
16+
{
17+
var result = next;
18+
next = null;
19+
return result;
20+
}
21+
22+
private Task AttachedTask
23+
{
24+
get
25+
{
26+
if (!task.TryGetTarget(out var result))
27+
result = Task.CompletedTask;
28+
29+
return result;
30+
}
31+
}
32+
33+
private bool IsCompleted => AttachedTask.IsCompleted;
34+
35+
public static void Add([NotNull] ref TaskNode? head, Task task)
36+
{
37+
RemoveCompleted(ref head);
38+
head = new(task) { next = head };
39+
}
40+
41+
private static void RemoveCompleted(ref TaskNode? head)
42+
{
43+
for (TaskNode? current = head, previous = null; current is not null;)
44+
{
45+
if (current.IsCompleted)
46+
{
47+
ref var next = ref previous is not null ? ref previous.next : ref head;
48+
next = current = current.CleanupAndGetNext(); // previous remains unchanged
49+
}
50+
else
51+
{
52+
previous = current;
53+
current = current.next;
54+
}
55+
}
56+
}
57+
58+
public static IEnumerable<Task> GetTasks(TaskNode? head)
59+
{
60+
return head is null ? [] : GetTasksImpl(head);
61+
62+
static IEnumerable<Task> GetTasksImpl(TaskNode? node)
63+
{
64+
while (node is not null)
65+
{
66+
if (node is { AttachedTask: { IsCompleted: false } task })
67+
yield return task;
68+
69+
node = node.next;
70+
}
71+
}
72+
}
73+
}
74+
}

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedListener.cs

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -131,35 +131,23 @@ private async Task ListenAsync()
131131
}
132132

133133
readiness.TrySetResult();
134-
var connections = new HashSet<WeakReference<Task>>();
134+
var headNode = default(TaskNode?);
135135
try
136136
{
137137
while (!lifetimeToken.IsCancellationRequested)
138138
{
139139
var clientSocket = await listeningSocket.AcceptAsync(lifetimeToken).ConfigureAwait(false);
140140
ConfigureAcceptedSocket(clientSocket);
141-
connections.Add(new(DispatchAsync(clientSocket)));
142-
143-
// GC: remove completed tasks
144-
connections.RemoveWhere(IsCompleted);
141+
TaskNode.Add(ref headNode, DispatchAsync(clientSocket));
145142
}
146143
}
147144
finally
148145
{
149146
listeningSocket.Dispose();
150-
await Task.WhenAll(connections.Select(Unwrap)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
151-
connections.Clear();
152-
}
153-
154-
static bool IsCompleted(WeakReference<Task> taskRef)
155-
=> !taskRef.TryGetTarget(out var task) || task.IsCompleted;
156-
157-
static Task Unwrap(WeakReference<Task> taskRef)
158-
{
159-
if (!taskRef.TryGetTarget(out var task))
160-
task = Task.CompletedTask;
161-
162-
return task;
147+
foreach (var task in TaskNode.GetTasks(headNode))
148+
{
149+
await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
150+
}
163151
}
164152
}
165153

0 commit comments

Comments
 (0)