Skip to content

Commit e580f87

Browse files
authored
Add support for a stopped token (#25)
* Add CancellationToken-based companion to StoppedTask. * Test improvements * Travis improvements
1 parent ca929f3 commit e580f87

File tree

5 files changed

+116
-60
lines changed

5 files changed

+116
-60
lines changed

Winton.Extensions.Threading.Actor.Tests.Unit/ActorTests.cs

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,35 @@ public async Task ShouldCompleteStoppedTaskWhenStopCompleted()
419419
stageOrder.Should().Equal(expectedStageOrder);
420420
}
421421

422+
[Fact]
423+
public async Task ShouldCancelStoppedTokenWhenStopCompleted()
424+
{
425+
var stageOrder = new List<string>();
426+
var actor = CreateActor(x =>
427+
{
428+
x.StartWork = new ActorStartWork(() => stageOrder.Add("Start"));
429+
x.StopWork = new ActorStopWork(
430+
() =>
431+
{
432+
Thread.Sleep(TimeSpan.FromMilliseconds(250));
433+
stageOrder.Add("Stop");
434+
});
435+
},
436+
ActorCreateOptions.None);
437+
438+
await actor.Start();
439+
440+
var cancelledPromise = new TaskCompletionSource<object>();
441+
442+
var cancellationRegistrationToken = actor.StoppedToken().Register(() => cancelledPromise.SetResult(null));
443+
444+
await actor.Stop();
445+
446+
cancelledPromise.Task.Wait(TimeSpan.FromMilliseconds(1000)).Should().BeTrue();
447+
448+
cancellationRegistrationToken.Dispose();
449+
}
450+
422451
[Theory]
423452
[InlineData(ResumeTestCase.AwaitOnTaskFactoryScheduledTask, StopWorkOutcome.Completes)]
424453
[InlineData(ResumeTestCase.AwaitOnTaskFactoryScheduledTask, StopWorkOutcome.Faults)]
@@ -446,27 +475,23 @@ public void ShouldNotBeAbleToResumeWorkAfterStop(ResumeTestCase resumeTestCase,
446475
"PostTriggerWait"
447476
};
448477

478+
Func<int> offActorWork =
479+
() =>
480+
{
481+
stages.Add("PreTriggerWait");
482+
pretrigger.SetResult(true);
483+
ThrowIfWaitTimesOut(trigger.Task);
484+
stages.Add("PostTriggerWait");
485+
return 345;
486+
};
487+
449488
switch (resumeTestCase)
450489
{
451490
case ResumeTestCase.AwaitOnSecondActor:
452-
suspendWork = () => actor2.Enqueue(() =>
453-
{
454-
stages.Add("PreTriggerWait");
455-
pretrigger.SetResult(true);
456-
ThrowIfWaitTimesOut(trigger.Task);
457-
stages.Add("PostTriggerWait");
458-
return 345;
459-
});
491+
suspendWork = () => actor2.Enqueue(offActorWork);
460492
break;
461493
case ResumeTestCase.AwaitOnTaskFactoryScheduledTask:
462-
suspendWork = () => new TaskFactory(TaskScheduler.Default).StartNew(() =>
463-
{
464-
stages.Add("PreTriggerWait");
465-
pretrigger.SetResult(true);
466-
ThrowIfWaitTimesOut(trigger.Task);
467-
stages.Add("PostTriggerWait");
468-
return 345;
469-
});
494+
suspendWork = () => new TaskFactory(TaskScheduler.Default).StartNew(offActorWork);
470495
break;
471496
default:
472497
throw new Exception($"Unhandled test case {resumeTestCase}.");
@@ -755,7 +780,7 @@ public void ShouldNotBeAbleToSpecifyStopWorkOnceActorStarted()
755780
[Theory]
756781
[InlineData(true)]
757782
[InlineData(false)]
758-
public void ShouldBeAbleToCancelAnyEnqueuedWork(bool delayStart)
783+
public async Task ShouldBeAbleToCancelAnyEnqueuedWork(bool delayStart)
759784
{
760785
var actor = CreateActor(delayStart ? ActorCreateOptions.None : ActorCreateOptions.Start);
761786

@@ -802,13 +827,13 @@ public void ShouldBeAbleToCancelAnyEnqueuedWork(bool delayStart)
802827

803828
if (delayStart)
804829
{
805-
actor.Start();
830+
await actor.Start();
806831
}
807832

808-
task1.AwaitingShouldCompleteIn(_waitTimeout);
809-
task4.AwaitingShouldCompleteIn(_waitTimeout);
810-
task6.AwaitingShouldCompleteIn(_waitTimeout);
811-
task8.AwaitingShouldCompleteIn(_waitTimeout);
833+
await task1;
834+
await task4;
835+
await task6;
836+
await task8;
812837
ShouldBeCancelled(task2);
813838
ShouldBeCancelled(task3);
814839
ShouldBeCancelled(task5);
@@ -993,7 +1018,7 @@ public async Task ShouldBeAbleToPauseActorUntilResumeFromAwaitReturningData(Acto
9931018
}
9941019

9951020
[Fact]
996-
public async Task ShutdownShouldReturnImmediatelyIfStartWorkFails()
1021+
public async Task StopShouldNotRunStopWorkIfStartWorkFails()
9971022
{
9981023
var stopWorkCalled = false;
9991024
var actor =
@@ -1005,11 +1030,8 @@ public async Task ShutdownShouldReturnImmediatelyIfStartWorkFails()
10051030

10061031
actor.Awaiting(async x => await x.Start()).ShouldThrow<Exception>().WithMessage("Error.");
10071032

1008-
var stopperThreadId = Thread.CurrentThread.ManagedThreadId;
1009-
10101033
await actor.Stop();
10111034

1012-
Thread.CurrentThread.ManagedThreadId.Should().Be(stopperThreadId);
10131035
stopWorkCalled.Should().BeFalse();
10141036
}
10151037

Winton.Extensions.Threading.Actor/ActorExtensions.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,5 +202,32 @@ public static Task<T> Enqueue<T>(this IActor self, Func<Task<T>> work, Cancellat
202202
{
203203
return self.Enqueue(work, cancellationToken, ActorEnqueueOptions.Default);
204204
}
205+
206+
/// <summary>
207+
/// Returns a <see cref="CancellationToken"/> that is cancelled when the given actor stops.
208+
/// </summary>
209+
/// <param name="self">The actor.</param>
210+
/// <returns></returns>
211+
public static CancellationToken StoppedToken(this IActor self)
212+
{
213+
var cancellationTokenSource = new CancellationTokenSource();
214+
215+
Task.Run(
216+
async () =>
217+
{
218+
try
219+
{
220+
await self.StoppedTask;
221+
}
222+
catch
223+
{
224+
}
225+
226+
cancellationTokenSource.Cancel();
227+
cancellationTokenSource.Dispose();
228+
});
229+
230+
return cancellationTokenSource.Token;
231+
}
205232
}
206233
}

Winton.Extensions.Threading.Actor/Internal/StateMachine/StoppingActorState.cs

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,36 +31,47 @@ protected override void EnterImpl()
3131

3232
Context.StartCompletionSource.TrySetResult(true);
3333

34-
var finalWork =
35-
(Action)(() =>
36-
{
37-
try
38-
{
39-
Context.StopWork.CancellationToken.ThrowIfCancellationRequested();
34+
Action finalWork =
35+
() =>
36+
{
37+
try
38+
{
39+
Context.StopWork.CancellationToken.ThrowIfCancellationRequested();
4040

41-
if (runStopWork)
42-
{
43-
Context.StopWork.SyncWork();
44-
}
41+
if (runStopWork)
42+
{
43+
Context.StopWork.SyncWork();
44+
}
45+
}
46+
finally
47+
{
48+
Context.TerminateTaskScheduler();
49+
}
50+
};
51+
var finalTask = Context.ActorTaskFactory.Create(finalWork, CancellationToken.None, Context.StopWork.TaskCreationOptions);
4552

46-
Context.StopCompletionSource.SetResult(true);
47-
}
48-
catch (OperationCanceledException)
49-
{
50-
Context.StopCompletionSource.SetCanceled();
51-
}
52-
catch (Exception exception)
53-
{
54-
Context.StopCompletionSource.SetException(exception);
55-
}
56-
finally
57-
{
58-
Context.TerminateTaskScheduler();
59-
Context.SetState<StoppedActorState>();
60-
}
61-
});
53+
Task.Run(async () =>
54+
{
55+
try
56+
{
57+
await finalTask;
58+
Context.StopCompletionSource.SetResult(true);
59+
}
60+
catch (OperationCanceledException)
61+
{
62+
Context.StopCompletionSource.SetCanceled();
63+
}
64+
catch (Exception exception)
65+
{
66+
Context.StopCompletionSource.SetException(exception);
67+
}
68+
finally
69+
{
70+
Context.SetState<StoppedActorState>();
71+
}
72+
});
6273

63-
Context.StartTask(Context.ActorTaskFactory.Create(finalWork, CancellationToken.None, Context.StopWork.TaskCreationOptions));
74+
Context.StartTask(finalTask);
6475

6576
foreach (var task in Context.InitialWorkQueue.Concat(Context.InitialWorkToBeCancelledQueue))
6677
{

build.ps1

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ function Exec
2525
exec { & dotnet restore }
2626
exec { & dotnet clean }
2727
exec { & dotnet build --configuration Release }
28-
exec { & dotnet test --no-build --configuration Release Winton.Extensions.Threading.Actor.Tests.Unit\Winton.Extensions.Threading.Actor.Tests.Unit.csproj }
29-
exec { & dotnet pack --no-build Winton.Extensions.Threading.Actor\Winton.Extensions.Threading.Actor.csproj --configuration Release }
28+
exec { & dotnet test --no-build --no-restore --configuration Release Winton.Extensions.Threading.Actor.Tests.Unit\Winton.Extensions.Threading.Actor.Tests.Unit.csproj }
29+
exec { & dotnet pack --no-build --no-restore Winton.Extensions.Threading.Actor\Winton.Extensions.Threading.Actor.csproj --configuration Release }

build.sh

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@ echo "Cleaning ..."
2020
dotnet clean
2121
echo
2222

23-
echo "Restoring ..."
24-
dotnet restore
25-
echo
26-
2723
echo "Building ..."
2824
dotnet build ./Winton.Extensions.Threading.Actor/Winton.Extensions.Threading.Actor.csproj --configuration Release --framework netstandard1.3
2925
echo
@@ -34,5 +30,5 @@ echo
3430

3531
if [ "${TRAVIS:-}" != "true" ]; then
3632
echo "Packing ..."
37-
dotnet pack ./Winton.Extensions.Threading.Actor/Winton.Extensions.Threading.Actor.csproj --no-build --configuration Release
33+
dotnet pack ./Winton.Extensions.Threading.Actor/Winton.Extensions.Threading.Actor.csproj --no-build --no-restore --configuration Release
3834
fi

0 commit comments

Comments
 (0)