Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#endregion

using System.Diagnostics;
using Stride.Core.Collections;

namespace Stride.Core.Threading;

Expand Down
79 changes: 17 additions & 62 deletions sources/core/Stride.Core.MicroThreading/MicroThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#pragma warning disable SA1402 // File may only contain a single class
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Stride.Core.Collections;
using Stride.Core.Diagnostics;

namespace Stride.Core.MicroThreading;
Expand All @@ -23,9 +22,10 @@ public class MicroThread

private static long globalCounterId;

private long priority;
private int state;
private readonly CancellationTokenSource cancellationTokenSource;
internal PriorityQueueNode<SchedulerEntry> ScheduledLinkedListNode;
private readonly SchedulerEntry schedulerEntry;
internal LinkedListNode<MicroThread> AllLinkedListNode; // Also used as lock for "CompletionTask"
internal MicroThreadCallbackList Callbacks;
internal SynchronizationContext? SynchronizationContext;
Expand All @@ -34,7 +34,7 @@ public MicroThread(Scheduler scheduler, MicroThreadFlags flags = MicroThreadFlag
{
Id = Interlocked.Increment(ref globalCounterId);
Scheduler = scheduler;
ScheduledLinkedListNode = new PriorityQueueNode<SchedulerEntry>(new SchedulerEntry(this));
schedulerEntry = new() { MicroThread = this };
AllLinkedListNode = new LinkedListNode<MicroThread>(this);
ScheduleMode = ScheduleMode.Last;
Flags = flags;
Expand All @@ -50,11 +50,14 @@ public MicroThread(Scheduler scheduler, MicroThreadFlags flags = MicroThreadFlag
/// </value>
public long Priority
{
get { return ScheduledLinkedListNode.Value.Priority; }
get { return priority; }
set
{
if (ScheduledLinkedListNode.Value.Priority != value)
Reschedule(ScheduleMode.First, value);
if (priority != value)
{
priority = value;
Scheduler.Reschedule(schedulerEntry, priority, ScheduleMode.First);
}
}
}

Expand Down Expand Up @@ -215,7 +218,7 @@ public void Start(Func<Task> microThreadFunction, ScheduleMode scheduleMode = Sc
/// <returns>Task.</returns>
public async Task Run()
{
Reschedule(ScheduleMode.First, Priority);
Scheduler.Reschedule(schedulerEntry, Priority, ScheduleMode.First);
var currentScheduler = Scheduler.Current;
if (currentScheduler == Scheduler)
await Scheduler.Yield();
Expand Down Expand Up @@ -251,69 +254,21 @@ internal void SetException(Exception exception)
State = (exception is OperationCanceledException) ? MicroThreadState.Canceled : MicroThreadState.Failed;
}

internal void Reschedule(ScheduleMode scheduleMode, long newPriority)
{
lock (Scheduler.ScheduledEntries)
{
if (ScheduledLinkedListNode.Index != -1)
{
Scheduler.ScheduledEntries.Remove(ScheduledLinkedListNode);
ScheduledLinkedListNode.Value.Priority = newPriority;
Scheduler.Schedule(ScheduledLinkedListNode, scheduleMode);
}
else
{
ScheduledLinkedListNode.Value.Priority = newPriority;
}
}
}

internal void ScheduleContinuation(ScheduleMode scheduleMode, SendOrPostCallback callback, object? callbackState)
{
Debug.Assert(callback != null);
lock (Scheduler.ScheduledEntries)
{
var node = NewCallback();
node.SendOrPostCallback = callback;
node.CallbackState = callbackState;
Callbacks.Add(node);

if (ScheduledLinkedListNode.Index == -1)
Scheduler.Schedule(ScheduledLinkedListNode, scheduleMode);
}
var node = Scheduler.NewCallback();
node.SendOrPostCallback = callback;
node.CallbackState = callbackState;
Scheduler.Schedule(ref Callbacks, node, schedulerEntry, priority, scheduleMode);
}

internal void ScheduleContinuation(ScheduleMode scheduleMode, Action callback)
{
Debug.Assert(callback != null);
lock (Scheduler.ScheduledEntries)
{
var node = NewCallback();
node.MicroThreadAction = callback;
Callbacks.Add(node);

if (ScheduledLinkedListNode.Index == -1)
Scheduler.Schedule(ScheduledLinkedListNode, scheduleMode);
}
}

private MicroThreadCallbackNode NewCallback()
{
MicroThreadCallbackNode node;
var pool = Scheduler.CallbackNodePool;

if (Scheduler.CallbackNodePool.Count > 0)
{
var index = pool.Count - 1;
node = pool[index];
pool.RemoveAt(index);
}
else
{
node = new MicroThreadCallbackNode();
}

return node;
var node = Scheduler.NewCallback();
node.MicroThreadAction = callback;
Scheduler.Schedule(ref Callbacks, node, schedulerEntry, priority, scheduleMode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ public readonly bool IsCompleted
if (microThread.IsOver)
return true;

lock (microThread.Scheduler.ScheduledEntries)
{
return microThread.Scheduler.ScheduledEntries.Count == 0;
}
return microThread.Scheduler.HasNoEntriesScheduled();
}
}

Expand Down
Loading
Loading