diff --git a/src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs index 75a7583ad..8295ea983 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs @@ -24,6 +24,8 @@ internal abstract class QueueConsumer : IBackgroundTask protected Task DispatchTask; private CancellationTokenSource _cancellationTokenSource; private Dictionary _activeTasks; + private List _runningTasks; + private readonly object _runningTasksLock = new object(); private ConcurrentHashSet _secondPasses; protected QueueConsumer(IQueueProvider queueProvider, ILoggerFactory loggerFactory, WorkflowOptions options) @@ -33,6 +35,7 @@ protected QueueConsumer(IQueueProvider queueProvider, ILoggerFactory loggerFacto Logger = loggerFactory.CreateLogger(GetType()); _activeTasks = new Dictionary(); + _runningTasks = new List(); _secondPasses = new ConcurrentHashSet(); } @@ -115,6 +118,10 @@ private async Task Execute() _activeTasks.Add(item, waitHandle); } var task = ExecuteItem(item, waitHandle, activity); + lock (_runningTasksLock) + { + _runningTasks.Add(task); + } } catch (OperationCanceledException) { @@ -138,6 +145,25 @@ private async Task Execute() foreach (var handle in toComplete) handle.WaitOne(); + + // Also await all running tasks to ensure proper async completion + Task[] tasksToAwait; + lock (_runningTasksLock) + { + tasksToAwait = _runningTasks.ToArray(); + } + + if (tasksToAwait.Length > 0) + { + try + { + await Task.WhenAll(tasksToAwait); + } + catch + { + // Individual task exceptions are already logged in ExecuteItem + } + } } private async Task ExecuteItem(string itemId, EventWaitHandle waitHandle, Activity activity) diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/StopAsyncScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/StopAsyncScenario.cs new file mode 100644 index 000000000..7339f736e --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/StopAsyncScenario.cs @@ -0,0 +1,97 @@ +using System; +using System.Diagnostics; +using System.Threading.Tasks; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using Xunit; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class StopAsyncWorkflow : IWorkflow + { + internal static DateTime? StepStartTime = null; + internal static DateTime? StepEndTime = null; + + public string Id => "StopAsyncWorkflow"; + public int Version => 1; + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(); + } + } + + internal class LongRunningStep : StepBodyAsync + { + public override async Task RunAsync(IStepExecutionContext context) + { + StopAsyncWorkflow.StepStartTime = DateTime.Now; + await Task.Delay(5000); // 5 second delay + StopAsyncWorkflow.StepEndTime = DateTime.Now; + return ExecutionResult.Next(); + } + } + + public class StopAsyncScenario : IDisposable + { + protected IWorkflowHost Host; + protected IPersistenceProvider PersistenceProvider; + + public StopAsyncScenario() + { + //setup dependency injection + IServiceCollection services = new ServiceCollection(); + services.AddLogging(); + services.AddWorkflow(options => options.UsePollInterval(TimeSpan.FromSeconds(3))); + + var serviceProvider = services.BuildServiceProvider(); + + PersistenceProvider = serviceProvider.GetService(); + Host = serviceProvider.GetService(); + Host.RegisterWorkflow(); + Host.Start(); + } + + [Fact] + public async Task StopAsync_should_wait_for_running_steps_to_complete() + { + // Arrange + StopAsyncWorkflow.StepStartTime = null; + StopAsyncWorkflow.StepEndTime = null; + + // Start a workflow with a long-running step + var workflowId = await Host.StartWorkflow("StopAsyncWorkflow", null); + + // Wait for the step to start executing + var waitCount = 0; + while (StopAsyncWorkflow.StepStartTime == null && waitCount < 50) + { + await Task.Delay(100); + waitCount++; + } + + StopAsyncWorkflow.StepStartTime.Should().NotBeNull("the step should have started before stopping"); + StopAsyncWorkflow.StepEndTime.Should().BeNull("the step should still be running"); + + // Act - Call StopAsync which should wait for the step to complete + var stopwatch = Stopwatch.StartNew(); + await Host.StopAsync(default); + stopwatch.Stop(); + + // Assert + // The step should have completed + StopAsyncWorkflow.StepEndTime.Should().NotBeNull("the step should have completed before StopAsync returned"); + + // StopAsync should have taken at least 3 seconds (the remaining delay time) + stopwatch.ElapsedMilliseconds.Should().BeGreaterOrEqualTo(3000, + "StopAsync should wait for the running step to complete"); + } + + public void Dispose() + { + // Dispose is intentionally empty to avoid double-stop + } + } +}