Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ internal abstract class QueueConsumer : IBackgroundTask
protected Task DispatchTask;
private CancellationTokenSource _cancellationTokenSource;
private Dictionary<string, EventWaitHandle> _activeTasks;
private List<Task> _runningTasks;
private readonly object _runningTasksLock = new object();
private ConcurrentHashSet<string> _secondPasses;

protected QueueConsumer(IQueueProvider queueProvider, ILoggerFactory loggerFactory, WorkflowOptions options)
Expand All @@ -33,6 +35,7 @@ protected QueueConsumer(IQueueProvider queueProvider, ILoggerFactory loggerFacto
Logger = loggerFactory.CreateLogger(GetType());

_activeTasks = new Dictionary<string, EventWaitHandle>();
_runningTasks = new List<Task>();
_secondPasses = new ConcurrentHashSet<string>();
}

Expand Down Expand Up @@ -115,6 +118,10 @@ private async Task Execute()
_activeTasks.Add(item, waitHandle);
}
var task = ExecuteItem(item, waitHandle, activity);
lock (_runningTasksLock)
{
_runningTasks.Add(task);
}
}
catch (OperationCanceledException)
{
Expand All @@ -138,6 +145,25 @@ private async Task Execute()

foreach (var handle in toComplete)
handle.WaitOne();

// Also await all running tasks to ensure proper async completion
Task[] tasksToAwait;
lock (_runningTasksLock)
{
tasksToAwait = _runningTasks.ToArray();
}

if (tasksToAwait.Length > 0)
{
try
{
await Task.WhenAll(tasksToAwait);
}
catch
{
// Individual task exceptions are already logged in ExecuteItem
}
}
}

private async Task ExecuteItem(string itemId, EventWaitHandle waitHandle, Activity activity)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using Xunit;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;

namespace WorkflowCore.IntegrationTests.Scenarios
{
public class StopAsyncWorkflow : IWorkflow
{
internal static DateTime? StepStartTime = null;
internal static DateTime? StepEndTime = null;

public string Id => "StopAsyncWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<Object> builder)
{
builder
.StartWith<LongRunningStep>();
}
}

internal class LongRunningStep : StepBodyAsync
{
public override async Task<ExecutionResult> RunAsync(IStepExecutionContext context)
{
StopAsyncWorkflow.StepStartTime = DateTime.Now;
await Task.Delay(5000); // 5 second delay
StopAsyncWorkflow.StepEndTime = DateTime.Now;
return ExecutionResult.Next();
}
}

public class StopAsyncScenario : IDisposable
{
protected IWorkflowHost Host;
protected IPersistenceProvider PersistenceProvider;

public StopAsyncScenario()
{
//setup dependency injection
IServiceCollection services = new ServiceCollection();
services.AddLogging();
services.AddWorkflow(options => options.UsePollInterval(TimeSpan.FromSeconds(3)));

var serviceProvider = services.BuildServiceProvider();

PersistenceProvider = serviceProvider.GetService<IPersistenceProvider>();
Host = serviceProvider.GetService<IWorkflowHost>();
Host.RegisterWorkflow<StopAsyncWorkflow, Object>();
Host.Start();
}

[Fact]
public async Task StopAsync_should_wait_for_running_steps_to_complete()
{
// Arrange
StopAsyncWorkflow.StepStartTime = null;
StopAsyncWorkflow.StepEndTime = null;

// Start a workflow with a long-running step
var workflowId = await Host.StartWorkflow<Object>("StopAsyncWorkflow", null);

// Wait for the step to start executing
var waitCount = 0;
while (StopAsyncWorkflow.StepStartTime == null && waitCount < 50)
{
await Task.Delay(100);
waitCount++;
}

StopAsyncWorkflow.StepStartTime.Should().NotBeNull("the step should have started before stopping");
StopAsyncWorkflow.StepEndTime.Should().BeNull("the step should still be running");

// Act - Call StopAsync which should wait for the step to complete
var stopwatch = Stopwatch.StartNew();
await Host.StopAsync(default);
stopwatch.Stop();

// Assert
// The step should have completed
StopAsyncWorkflow.StepEndTime.Should().NotBeNull("the step should have completed before StopAsync returned");

// StopAsync should have taken at least 3 seconds (the remaining delay time)
stopwatch.ElapsedMilliseconds.Should().BeGreaterOrEqualTo(3000,
"StopAsync should wait for the running step to complete");
}

public void Dispose()
{
// Dispose is intentionally empty to avoid double-stop
}
}
}
Loading