Skip to content

Commit ac542a8

Browse files
authored
Merge pull request #13 from jhickson/fixpauseandresume
Fixed naive approach to pause/resume
2 parents 56cb084 + 344de52 commit ac542a8

File tree

7 files changed

+216
-75
lines changed

7 files changed

+216
-75
lines changed

Winton.Extensions.Threading.Actor.Tests.Unit/ActorTests.cs

Lines changed: 61 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -875,74 +875,86 @@ public void ShouldStopActorAndNotProcessAnyAlreadyEnqueuedWorkIfStartWorkCancell
875875
attempts.Should().Be(0);
876876
}
877877

878-
[Fact]
879-
public async Task ShouldBeAbleToPauseActorUntilResumeFromAwait()
878+
[Theory]
879+
[InlineData(ActorEnqueueOptions.Default, ActorEnqueueOptions.Default)]
880+
[InlineData(ActorEnqueueOptions.WorkIsLongRunning, ActorEnqueueOptions.Default)]
881+
[InlineData(ActorEnqueueOptions.WorkIsLongRunning, ActorEnqueueOptions.WorkIsLongRunning)]
882+
[InlineData(ActorEnqueueOptions.Default, ActorEnqueueOptions.WorkIsLongRunning)]
883+
public async Task ShouldBeAbleToPauseActorUntilResumeFromAwait(ActorEnqueueOptions awaiterOptions, ActorEnqueueOptions otherOptions)
880884
{
881-
var actor = CreateActor();
882-
var numbers = new List<int>();
885+
// Do this repeatedly to try to expose race conditions in the pausing logic
886+
for (var i = 0; i < 1000; i++)
887+
{
888+
var actor = new Actor();
889+
var numbers = new List<int>();
883890

884-
await actor.Start();
891+
await actor.Start();
885892

886-
var tasks =
887-
new[]
888-
{
893+
var tasks = new Task[3];
894+
895+
tasks[0] =
889896
actor.Enqueue(
890897
async () =>
891898
{
892-
numbers.Add(1);
893-
894-
await Task.Delay(TimeSpan.FromSeconds(1)).WhileActorPaused();
895-
896-
numbers.Add(2);
897-
}),
898-
actor.Enqueue(() => numbers.Add(3)),
899-
actor.Enqueue(() => numbers.Add(4)),
900-
actor.Enqueue(() => numbers.Add(5))
901-
};
899+
numbers.Add(i);
900+
var task = Task.Run(() => numbers.Add(i + 1));
901+
await task.WhileActorPaused();
902+
numbers.Add(i + 2);
903+
}, awaiterOptions);
904+
tasks[1] = actor.Enqueue(() => numbers.Add(i + 3), otherOptions);
905+
tasks[2] = actor.Enqueue(() => numbers.Add(i + 4), otherOptions);
906+
907+
foreach (var task in tasks)
908+
{
909+
await task;
910+
}
902911

903-
await Task.WhenAll(tasks);
912+
await actor.Stop();
904913

905-
numbers.Should().Equal(1, 2, 3, 4, 5);
914+
numbers.Should().Equal(i, i + 1, i + 2, i + 3, i + 4);
915+
}
906916
}
907917

908-
[Fact]
909-
public async Task ShouldBeAbleToPauseActorUntilResumeFromAwaitReturningData()
918+
[Theory]
919+
[InlineData(ActorEnqueueOptions.Default, ActorEnqueueOptions.Default)]
920+
[InlineData(ActorEnqueueOptions.WorkIsLongRunning, ActorEnqueueOptions.Default)]
921+
[InlineData(ActorEnqueueOptions.WorkIsLongRunning, ActorEnqueueOptions.WorkIsLongRunning)]
922+
[InlineData(ActorEnqueueOptions.Default, ActorEnqueueOptions.WorkIsLongRunning)]
923+
public async Task ShouldBeAbleToPauseActorUntilResumeFromAwaitReturningData(ActorEnqueueOptions awaiterOptions, ActorEnqueueOptions otherOptions)
910924
{
911-
var actor = CreateActor();
912-
var numbers = new List<int>();
913-
var promise = new TaskCompletionSource<int>();
925+
// Do this repeatedly to try to expose race conditions in the pausing logic
926+
for (var i = 0; i < 1000; i++)
927+
{
928+
var actor = CreateActor();
929+
var numbers = new List<int>();
914930

915-
await actor.Start();
931+
await actor.Start();
916932

917-
var tasks =
918-
new[]
919-
{
933+
var tasks = new Task[4];
934+
935+
tasks[0] =
920936
actor.Enqueue(
921-
async () =>
922-
{
923-
numbers.Add(1);
937+
async () =>
938+
{
939+
numbers.Add(i);
924940

925-
var next = await promise.Task.WhileActorPaused();
941+
var next = await Task.Run(() => i + 1).WhileActorPaused();
926942

927-
numbers.Add(next);
928-
}),
929-
actor.Enqueue(() => numbers.Add(3)),
930-
actor.Enqueue(() => numbers.Add(4)),
931-
actor.Enqueue(() => numbers.Add(5))
932-
};
943+
numbers.Add(next);
944+
}, awaiterOptions);
945+
tasks[1] = actor.Enqueue(() => numbers.Add(i + 2), otherOptions);
946+
tasks[2] = actor.Enqueue(() => numbers.Add(i + 3), otherOptions);
947+
tasks[3] = actor.Enqueue(() => numbers.Add(i + 4), otherOptions);
933948

934-
var promiseSetter =
935-
Task.Run(
936-
async () =>
937-
{
938-
await Task.Delay(TimeSpan.FromSeconds(1));
939-
promise.SetResult(2);
940-
});
949+
foreach (var task in tasks)
950+
{
951+
await task;
952+
}
941953

942-
await Task.WhenAll(tasks);
943-
await promiseSetter;
954+
await actor.Stop();
944955

945-
numbers.Should().Equal(1, 2, 3, 4, 5);
956+
numbers.Should().Equal(i, i + 1, i + 2, i + 3, i + 4);
957+
}
946958
}
947959

948960
[Flags]

Winton.Extensions.Threading.Actor.sln

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
Microsoft Visual Studio Solution File, Format Version 12.00
33
# Visual Studio 15
4-
VisualStudioVersion = 15.0.26228.4
4+
VisualStudioVersion = 15.0.26430.13
55
MinimumVisualStudioVersion = 10.0.40219.1
66
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Libraries", "Libraries", "{F9C63802-0833-4F6F-A220-3193ACA0FC67}"
77
EndProject
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
using System;
2+
using System.Runtime.CompilerServices;
3+
using System.Threading.Tasks;
4+
5+
namespace Winton.Extensions.Threading.Actor.Internal
6+
{
7+
/// <summary>
8+
/// Awaitable for controlling the pause/resume logic. Forces all awaits requiring a pause to be handled asynchronously
9+
/// to simplify matters elsewhere.
10+
/// </summary>
11+
public struct ActorPauseAwaitable : ICriticalNotifyCompletion
12+
{
13+
private readonly TaskAwaiter _taskAwaiter;
14+
15+
public ActorPauseAwaitable GetAwaiter()
16+
{
17+
return this;
18+
}
19+
20+
internal ActorPauseAwaitable(Task task)
21+
{
22+
_taskAwaiter = task.GetAwaiter();
23+
IsCompleted = false;
24+
}
25+
26+
public bool IsCompleted { get; }
27+
28+
public void OnCompleted(Action continuation)
29+
{
30+
_taskAwaiter.OnCompleted(continuation);
31+
}
32+
33+
public void UnsafeOnCompleted(Action continuation)
34+
{
35+
_taskAwaiter.UnsafeOnCompleted(continuation);
36+
}
37+
38+
public void GetResult()
39+
{
40+
_taskAwaiter.GetResult();
41+
}
42+
}
43+
44+
/// <summary>
45+
/// Awaitable for controlling the pause/resume logic. Forces all awaits requiring a pause to be handled asynchronously
46+
/// to simplify matters elsewhere.
47+
/// </summary>
48+
public struct ActorPauseAwaitable<TResult> : ICriticalNotifyCompletion
49+
{
50+
private readonly TaskAwaiter<TResult> _taskAwaiter;
51+
52+
public ActorPauseAwaitable<TResult> GetAwaiter()
53+
{
54+
return this;
55+
}
56+
57+
internal ActorPauseAwaitable(Task<TResult> task)
58+
{
59+
_taskAwaiter = task.GetAwaiter();
60+
IsCompleted = false;
61+
}
62+
63+
public bool IsCompleted { get; }
64+
65+
public void OnCompleted(Action continuation)
66+
{
67+
_taskAwaiter.OnCompleted(continuation);
68+
}
69+
70+
public void UnsafeOnCompleted(Action continuation)
71+
{
72+
_taskAwaiter.UnsafeOnCompleted(continuation);
73+
}
74+
75+
public TResult GetResult()
76+
{
77+
return _taskAwaiter.GetResult();
78+
}
79+
}
80+
}

Winton.Extensions.Threading.Actor/Internal/ActorTaskScheduler.cs

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ internal sealed class ActorTaskScheduler : TaskScheduler
1818
private readonly IWorkItemQueuer _workItemQueuer;
1919

2020
private ActorSynchronizationContext _resumingSynchronizationContext;
21+
private Task _resumingTask;
2122

2223
public ActorTaskScheduler(ActorId actorId, IWorkItemQueuer workItemQueuer, IActorTaskFactory actorTaskFactory)
2324
{
@@ -38,18 +39,30 @@ public void Terminate()
3839
}
3940
}
4041

41-
public void WhileActorPaused(Task task)
42+
public ActorPauseAwaitable WhileActorPaused(Task task)
43+
{
44+
Pause();
45+
return new ActorPauseAwaitable(task);
46+
}
47+
48+
public ActorPauseAwaitable<T> WhileActorPaused<T>(Task<T> task)
49+
{
50+
Pause();
51+
return new ActorPauseAwaitable<T>(task);
52+
}
53+
54+
private bool HaveActiveThread { get; set; }
55+
56+
private void Pause()
4257
{
4358
lock (_lockObject)
4459
{
45-
if (_statusManager.State == ActorTaskSchedulerStatus.Terminated)
60+
if (_statusManager.State != ActorTaskSchedulerStatus.Terminated)
4661
{
47-
return;
62+
_resumingSynchronizationContext = _resumingSynchronizationContext ?? _synchronizationContext.ChangeActorTaskKind(ActorTaskKind.Resumer);
63+
SynchronizationContext.SetSynchronizationContext(_resumingSynchronizationContext);
64+
_statusManager.MarkPaused();
4865
}
49-
50-
_resumingSynchronizationContext = _resumingSynchronizationContext ?? _synchronizationContext.ChangeActorTaskKind(ActorTaskKind.Resumer);
51-
SynchronizationContext.SetSynchronizationContext(_resumingSynchronizationContext);
52-
_statusManager.MarkPaused();
5366
}
5467
}
5568

@@ -75,7 +88,15 @@ protected override void QueueTask(Task task)
7588
if (actorTaskContext.Kind == ActorTaskKind.Resumer)
7689
{
7790
_statusManager.MarkActive();
78-
LaunchNew(task);
91+
92+
if (HaveActiveThread)
93+
{
94+
_resumingTask = task;
95+
}
96+
else
97+
{
98+
LaunchNew(task);
99+
}
79100
}
80101
else
81102
{
@@ -108,6 +129,8 @@ private enum YieldReason
108129

109130
private void LaunchNew(Task task)
110131
{
132+
HaveActiveThread = true;
133+
111134
if (!task.CreationOptions.HasFlag(TaskCreationOptions.LongRunning))
112135
{
113136
_workItemQueuer.QueueOnThreadPoolThread(
@@ -121,6 +144,8 @@ private void LaunchNew(Task task)
121144

122145
lock (_lockObject)
123146
{
147+
HaveActiveThread = false;
148+
124149
switch (CheckForReasonToYield())
125150
{
126151
case YieldReason.Paused:
@@ -132,7 +157,7 @@ private void LaunchNew(Task task)
132157
_statusManager.MarkInactive();
133158
break;
134159
case YieldReason.None:
135-
LaunchNew(_taskQueue.Dequeue());
160+
LaunchNew(GetNextTask());
136161
break;
137162
}
138163
}
@@ -143,37 +168,49 @@ private void LaunchNew(Task task)
143168
_workItemQueuer.QueueOnNonThreadPoolThread(
144169
() =>
145170
{
171+
var continueExecuting = true;
146172
PrepareForExecute();
147173

148-
while (true)
174+
while (continueExecuting)
149175
{
150176
TryExecuteTask(task);
151177

152178
lock (_lockObject)
153179
{
180+
continueExecuting = false;
181+
154182
switch (CheckForReasonToYield())
155183
{
156184
case YieldReason.Paused:
157-
CleanUpAfterExecute();
158-
return;
185+
break;
159186
case YieldReason.Terminated:
160-
CleanUpAfterExecute();
161187
ClearTaskQueue();
162-
return;
188+
break;
163189
case YieldReason.QueueEmpty:
164-
CleanUpAfterExecute();
165190
_statusManager.MarkInactive();
166-
return;
191+
break;
167192
case YieldReason.None:
168-
task = _taskQueue.Dequeue();
193+
task = GetNextTask();
194+
continueExecuting = true;
169195
break;
170196
}
197+
198+
HaveActiveThread = continueExecuting;
171199
}
172200
}
201+
202+
CleanUpAfterExecute();
173203
});
174204
}
175205
}
176206

207+
private Task GetNextTask()
208+
{
209+
var nextTask = _resumingTask ?? _taskQueue.Dequeue();
210+
_resumingTask = null;
211+
return nextTask;
212+
}
213+
177214
private void ClearTaskQueue()
178215
{
179216
while (_taskQueue.Count > 0)
@@ -208,7 +245,7 @@ private YieldReason CheckForReasonToYield()
208245
return YieldReason.Paused;
209246
}
210247

211-
if (_taskQueue.Count == 0)
248+
if (_resumingTask == null && _taskQueue.Count == 0)
212249
{
213250
return YieldReason.QueueEmpty;
214251
}

Winton.Extensions.Threading.Actor/Internal/ActorWorkScheduler.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ private async Task Schedule(Func<Task> enqueuer, TimeSpan interval, ActorSchedul
8282
await GetDelay(interval, cancellationToken).ConfigureAwait(false);
8383
await enqueuer().ConfigureAwait(false);
8484
}
85+
86+
cancellationToken.ThrowIfCancellationRequested();
8587
}
8688

8789
private Task GetDelay(TimeSpan interval, CancellationToken cancellationToken)

0 commit comments

Comments
 (0)