Skip to content

Commit 447bd79

Browse files
authored
Merge pull request #1401 from danielgerlag/copilot/fix-stopasync-task-completion
Fix StopAsync to properly await async task completion in workflow steps
2 parents 32c4c37 + 0b73e57 commit 447bd79

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed

src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ internal abstract class QueueConsumer : IBackgroundTask
2424
protected Task DispatchTask;
2525
private CancellationTokenSource _cancellationTokenSource;
2626
private Dictionary<string, EventWaitHandle> _activeTasks;
27+
private List<Task> _runningTasks;
28+
private readonly object _runningTasksLock = new object();
2729
private ConcurrentHashSet<string> _secondPasses;
2830

2931
protected QueueConsumer(IQueueProvider queueProvider, ILoggerFactory loggerFactory, WorkflowOptions options)
@@ -33,6 +35,7 @@ protected QueueConsumer(IQueueProvider queueProvider, ILoggerFactory loggerFacto
3335
Logger = loggerFactory.CreateLogger(GetType());
3436

3537
_activeTasks = new Dictionary<string, EventWaitHandle>();
38+
_runningTasks = new List<Task>();
3639
_secondPasses = new ConcurrentHashSet<string>();
3740
}
3841

@@ -115,6 +118,10 @@ private async Task Execute()
115118
_activeTasks.Add(item, waitHandle);
116119
}
117120
var task = ExecuteItem(item, waitHandle, activity);
121+
lock (_runningTasksLock)
122+
{
123+
_runningTasks.Add(task);
124+
}
118125
}
119126
catch (OperationCanceledException)
120127
{
@@ -138,6 +145,25 @@ private async Task Execute()
138145

139146
foreach (var handle in toComplete)
140147
handle.WaitOne();
148+
149+
// Also await all running tasks to ensure proper async completion
150+
Task[] tasksToAwait;
151+
lock (_runningTasksLock)
152+
{
153+
tasksToAwait = _runningTasks.ToArray();
154+
}
155+
156+
if (tasksToAwait.Length > 0)
157+
{
158+
try
159+
{
160+
await Task.WhenAll(tasksToAwait);
161+
}
162+
catch
163+
{
164+
// Individual task exceptions are already logged in ExecuteItem
165+
}
166+
}
141167
}
142168

143169
private async Task ExecuteItem(string itemId, EventWaitHandle waitHandle, Activity activity)
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
using System;
2+
using System.Diagnostics;
3+
using System.Threading.Tasks;
4+
using WorkflowCore.Interface;
5+
using WorkflowCore.Models;
6+
using Xunit;
7+
using FluentAssertions;
8+
using Microsoft.Extensions.DependencyInjection;
9+
10+
namespace WorkflowCore.IntegrationTests.Scenarios
11+
{
12+
public class StopAsyncWorkflow : IWorkflow
13+
{
14+
internal static DateTime? StepStartTime = null;
15+
internal static DateTime? StepEndTime = null;
16+
17+
public string Id => "StopAsyncWorkflow";
18+
public int Version => 1;
19+
public void Build(IWorkflowBuilder<Object> builder)
20+
{
21+
builder
22+
.StartWith<LongRunningStep>();
23+
}
24+
}
25+
26+
internal class LongRunningStep : StepBodyAsync
27+
{
28+
public override async Task<ExecutionResult> RunAsync(IStepExecutionContext context)
29+
{
30+
StopAsyncWorkflow.StepStartTime = DateTime.Now;
31+
await Task.Delay(5000); // 5 second delay
32+
StopAsyncWorkflow.StepEndTime = DateTime.Now;
33+
return ExecutionResult.Next();
34+
}
35+
}
36+
37+
public class StopAsyncScenario : IDisposable
38+
{
39+
protected IWorkflowHost Host;
40+
protected IPersistenceProvider PersistenceProvider;
41+
42+
public StopAsyncScenario()
43+
{
44+
//setup dependency injection
45+
IServiceCollection services = new ServiceCollection();
46+
services.AddLogging();
47+
services.AddWorkflow(options => options.UsePollInterval(TimeSpan.FromSeconds(3)));
48+
49+
var serviceProvider = services.BuildServiceProvider();
50+
51+
PersistenceProvider = serviceProvider.GetService<IPersistenceProvider>();
52+
Host = serviceProvider.GetService<IWorkflowHost>();
53+
Host.RegisterWorkflow<StopAsyncWorkflow, Object>();
54+
Host.Start();
55+
}
56+
57+
[Fact]
58+
public async Task StopAsync_should_wait_for_running_steps_to_complete()
59+
{
60+
// Arrange
61+
StopAsyncWorkflow.StepStartTime = null;
62+
StopAsyncWorkflow.StepEndTime = null;
63+
64+
// Start a workflow with a long-running step
65+
var workflowId = await Host.StartWorkflow<Object>("StopAsyncWorkflow", null);
66+
67+
// Wait for the step to start executing
68+
var waitCount = 0;
69+
while (StopAsyncWorkflow.StepStartTime == null && waitCount < 50)
70+
{
71+
await Task.Delay(100);
72+
waitCount++;
73+
}
74+
75+
StopAsyncWorkflow.StepStartTime.Should().NotBeNull("the step should have started before stopping");
76+
StopAsyncWorkflow.StepEndTime.Should().BeNull("the step should still be running");
77+
78+
// Act - Call StopAsync which should wait for the step to complete
79+
var stopwatch = Stopwatch.StartNew();
80+
await Host.StopAsync(default);
81+
stopwatch.Stop();
82+
83+
// Assert
84+
// The step should have completed
85+
StopAsyncWorkflow.StepEndTime.Should().NotBeNull("the step should have completed before StopAsync returned");
86+
87+
// StopAsync should have taken at least 3 seconds (the remaining delay time)
88+
stopwatch.ElapsedMilliseconds.Should().BeGreaterOrEqualTo(3000,
89+
"StopAsync should wait for the running step to complete");
90+
}
91+
92+
public void Dispose()
93+
{
94+
// Dispose is intentionally empty to avoid double-stop
95+
}
96+
}
97+
}

0 commit comments

Comments
 (0)