Skip to content

Commit 173bee3

Browse files
authored
Re-use thread pool threads (#27)
This is as opposed to queuing each actor task on the thread pool. This improves performance in many cases.
1 parent fc37902 commit 173bee3

File tree

3 files changed

+45
-58
lines changed

3 files changed

+45
-58
lines changed

Winton.Extensions.Threading.Actor.Tests.Unit/Internal/ActorWorkSchedulerTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ public void ShouldBeAbleToConfigureScheduleToRescheduleInCaseOfUnexpectedErrorBu
279279
throw new Exception($"Unhandled test case {workType}.");
280280
}
281281

282-
Within.FiveSeconds(() => times.Should().HaveCount(4));
282+
Within.FiveSeconds(() => times.Count.Should().BeGreaterOrEqualTo(4));
283283

284284
emittedException.Should().BeOfType<InvalidOperationException>().Which.Message.Should().Be("Pah!");
285285

Winton.Extensions.Threading.Actor.Tests.Unit/Winton.Extensions.Threading.Actor.Tests.Unit.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
</PropertyGroup>
1111

1212
<ItemGroup>
13-
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0" />
13+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.6.0" />
1414
<PackageReference Include="Moq" Version="4.8.1" />
1515
<PackageReference Include="xunit" Version="2.3.1" />
1616
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />

Winton.Extensions.Threading.Actor/Internal/ActorTaskScheduler.cs

Lines changed: 43 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,6 @@ internal sealed class ActorTaskScheduler : TaskScheduler
1313
[ThreadStatic]
1414
private static bool _shouldPause;
1515

16-
#if NETSTANDARD1_3
17-
[ThreadStatic]
18-
private static bool _isNonThreadPoolThread;
19-
#endif
20-
2116
private readonly ActorId _actorId;
2217
private readonly object _lockObject = new object();
2318
private readonly ActorSynchronizationContext _synchronizationContext;
@@ -164,95 +159,87 @@ private void QueueForExecutionOffThreadPool(ActorTask actorTask)
164159
{
165160
_thread?.Join();
166161
_thread =
167-
#if NETSTANDARD1_3
168-
new Thread(x =>
169-
{
170-
_isNonThreadPoolThread = true;
171-
Execute(x);
172-
})
173-
#else
174162
new Thread(Execute)
175-
#endif
176163
{
177164
IsBackground = true,
178165
Name = "ActorWorker"
179166
};
180167
_thread.Start(actorTask);
181168
}
182169

183-
private void QueueOnThreadPool(ActorTask actorTask)
184-
{
170+
private void QueueOnThreadPool(ActorTask actorTask) =>
185171
#if NETSTANDARD1_3
186172
ThreadPool.QueueUserWorkItem(Execute, actorTask);
187173
#else
188174
ThreadPool.UnsafeQueueUserWorkItem(Execute, actorTask);
189175
#endif
190-
}
191176

192177
private void Execute(object state)
193178
{
194179
var previousSynchronizationContext = PrepareThreadForExecute();
195-
196180
var actorTask = (ActorTask)state;
197-
var task = actorTask.Task;
181+
var onLongRunningThread = (actorTask.Task.CreationOptions & TaskCreationOptions.LongRunning) == TaskCreationOptions.LongRunning;
182+
var switchThreadType = false;
198183

199-
var isTerminalTask = (actorTask.Traits & ActorTaskTraits.Terminal) != 0;
184+
while (!(actorTask is null) && !switchThreadType)
185+
{
186+
var task = actorTask.Task;
187+
var isTerminalTask = (actorTask.Traits & ActorTaskTraits.Terminal) != 0;
200188

201-
TryExecuteTask(task);
189+
TryExecuteTask(task);
202190

203-
actorTask.CleanUpPostExecute();
191+
actorTask.CleanUpPostExecute();
204192

205-
CleanUpThreadAfterExecute(previousSynchronizationContext);
193+
var shouldTerminate = isTerminalTask || (actorTask.Traits & ActorTaskTraits.Critical) != 0 && task.Status != TaskStatus.RanToCompletion;
206194

207-
var shouldTerminate = isTerminalTask || (actorTask.Traits & ActorTaskTraits.Critical) != 0 && task.Status != TaskStatus.RanToCompletion;
195+
actorTask = null;
208196

209-
TakeLock();
197+
TakeLock();
210198

211-
if (shouldTerminate)
212-
{
213-
_status = ActorTaskSchedulerStatus.Terminated;
214-
ReleaseLock();
215-
TerminationCleanUp(isTerminalTask ? task : null);
216-
}
217-
else if (_shouldPause && (_front is null || (_front.Traits & ActorTaskTraits.Resuming) == 0))
218-
{
219-
_status = ActorTaskSchedulerStatus.Paused;
220-
ReleaseLock();
221-
}
222-
else
223-
{
224-
var nextTask = _front;
225-
226-
if (nextTask is null)
199+
if (shouldTerminate)
227200
{
228-
_status = ActorTaskSchedulerStatus.Inactive;
201+
_status = ActorTaskSchedulerStatus.Terminated;
202+
ReleaseLock();
203+
TerminationCleanUp(isTerminalTask ? task : null);
204+
}
205+
else if (_shouldPause && (_front is null || (_front.Traits & ActorTaskTraits.Resuming) == 0))
206+
{
207+
_status = ActorTaskSchedulerStatus.Paused;
229208
ReleaseLock();
230209
}
231210
else
232211
{
233-
_front = _front.Next;
234-
235212
if (_front is null)
236213
{
237-
_back = null;
238-
}
239-
240-
ReleaseLock();
241-
242-
#if NETSTANDARD1_3
243-
if (_isNonThreadPoolThread)
244-
#else
245-
if (!Thread.CurrentThread.IsThreadPoolThread)
246-
#endif
247-
{
248-
Execute(nextTask);
214+
_status = ActorTaskSchedulerStatus.Inactive;
249215
}
250216
else
251217
{
252-
QueueForExecution(nextTask);
218+
_shouldPause = false;
219+
actorTask = _front;
220+
_front = _front.Next;
221+
222+
if (_front is null)
223+
{
224+
_back = null;
225+
}
226+
227+
if (!onLongRunningThread)
228+
{
229+
switchThreadType = (actorTask.Task.CreationOptions & TaskCreationOptions.LongRunning) == TaskCreationOptions.LongRunning;
230+
}
253231
}
232+
233+
ReleaseLock();
254234
}
255235
}
236+
237+
CleanUpThreadAfterExecute(previousSynchronizationContext);
238+
239+
if (switchThreadType)
240+
{
241+
QueueForExecution(actorTask);
242+
}
256243
}
257244

258245
private void TerminationCleanUp(Task terminalTask)

0 commit comments

Comments
 (0)