Skip to content

Commit b8af93a

Browse files
committed
issue #645
1 parent 18be33b commit b8af93a

File tree

2 files changed

+44
-50
lines changed

2 files changed

+44
-50
lines changed

src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs

Lines changed: 40 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@ internal abstract class QueueConsumer : IBackgroundTask
2121
protected readonly WorkflowOptions Options;
2222
protected Task DispatchTask;
2323
private CancellationTokenSource _cancellationTokenSource;
24+
private Dictionary<string, EventWaitHandle> _activeTasks;
25+
private ConcurrentHashSet<string> _secondPasses;
2426

2527
protected QueueConsumer(IQueueProvider queueProvider, ILoggerFactory loggerFactory, WorkflowOptions options)
2628
{
2729
QueueProvider = queueProvider;
2830
Options = options;
2931
Logger = loggerFactory.CreateLogger(GetType());
32+
33+
_activeTasks = new Dictionary<string, EventWaitHandle>();
34+
_secondPasses = new ConcurrentHashSet<string>();
3035
}
3136

3237
protected abstract Task ProcessItem(string itemId, CancellationToken cancellationToken);
@@ -39,9 +44,8 @@ public virtual void Start()
3944
}
4045

4146
_cancellationTokenSource = new CancellationTokenSource();
42-
43-
DispatchTask = new Task(Execute, TaskCreationOptions.LongRunning);
44-
DispatchTask.Start();
47+
48+
DispatchTask = Task.Factory.StartNew(Execute, TaskCreationOptions.LongRunning);
4549
}
4650

4751
public virtual void Stop()
@@ -51,20 +55,18 @@ public virtual void Stop()
5155
DispatchTask = null;
5256
}
5357

54-
private async void Execute()
58+
private async Task Execute()
5559
{
56-
var cancelToken = _cancellationTokenSource.Token;
57-
var activeTasks = new Dictionary<string, Task>();
58-
var secondPasses = new ConcurrentHashSet<string>();
60+
var cancelToken = _cancellationTokenSource.Token;
5961

6062
while (!cancelToken.IsCancellationRequested)
6163
{
6264
try
6365
{
6466
var activeCount = 0;
65-
lock (activeTasks)
67+
lock (_activeTasks)
6668
{
67-
activeCount = activeTasks.Count;
69+
activeCount = _activeTasks.Count;
6870
}
6971
if (activeCount >= MaxConcurrentItems)
7072
{
@@ -82,45 +84,26 @@ private async void Execute()
8284
}
8385

8486
var hasTask = false;
85-
lock (activeTasks)
87+
lock (_activeTasks)
8688
{
87-
hasTask = activeTasks.ContainsKey(item);
89+
hasTask = _activeTasks.ContainsKey(item);
8890
}
8991
if (hasTask)
9092
{
91-
secondPasses.Add(item);
93+
_secondPasses.Add(item);
9294
if (!EnableSecondPasses)
9395
await QueueProvider.QueueWork(item, Queue);
9496
continue;
9597
}
9698

97-
secondPasses.TryRemove(item);
99+
_secondPasses.TryRemove(item);
98100

99-
var task = new Task(async (object data) =>
100-
{
101-
try
102-
{
103-
await ExecuteItem((string)data);
104-
while (EnableSecondPasses && secondPasses.Contains(item))
105-
{
106-
secondPasses.TryRemove(item);
107-
await ExecuteItem((string)data);
108-
}
109-
}
110-
finally
111-
{
112-
lock (activeTasks)
113-
{
114-
activeTasks.Remove((string)data);
115-
}
116-
}
117-
}, item);
118-
lock (activeTasks)
101+
var waitHandle = new ManualResetEvent(false);
102+
lock (_activeTasks)
119103
{
120-
activeTasks.Add(item, task);
104+
_activeTasks.Add(item, waitHandle);
121105
}
122-
123-
task.Start();
106+
var task = ExecuteItem(item, waitHandle);
124107
}
125108
catch (OperationCanceledException)
126109
{
@@ -131,21 +114,26 @@ private async void Execute()
131114
}
132115
}
133116

134-
List<Task> toComplete;
135-
lock (activeTasks)
117+
List<EventWaitHandle> toComplete;
118+
lock (_activeTasks)
136119
{
137-
toComplete = activeTasks.Values.ToList();
120+
toComplete = _activeTasks.Values.ToList();
138121
}
139-
140-
foreach (var task in toComplete)
141-
task.Wait();
122+
123+
foreach (var handle in toComplete)
124+
handle.WaitOne();
142125
}
143126

144-
private Task ExecuteItem(string itemId)
127+
private async Task ExecuteItem(string itemId, EventWaitHandle waitHandle)
145128
{
146129
try
147130
{
148-
ProcessItem(itemId, _cancellationTokenSource.Token).Wait();
131+
await ProcessItem(itemId, _cancellationTokenSource.Token);
132+
while (EnableSecondPasses && _secondPasses.Contains(itemId))
133+
{
134+
_secondPasses.TryRemove(itemId);
135+
await ProcessItem(itemId, _cancellationTokenSource.Token);
136+
}
149137
}
150138
catch (OperationCanceledException)
151139
{
@@ -155,8 +143,14 @@ private Task ExecuteItem(string itemId)
155143
{
156144
Logger.LogError(default(EventId), ex, $"Error executing item {itemId} - {ex.Message}");
157145
}
158-
159-
return Task.CompletedTask;
146+
finally
147+
{
148+
waitHandle.Set();
149+
lock (_activeTasks)
150+
{
151+
_activeTasks.Remove(itemId);
152+
}
153+
}
160154
}
161155
}
162156
}

src/WorkflowCore/WorkflowCore.csproj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
1616
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
1717
<Description>Workflow Core is a light weight workflow engine targeting .NET Standard.</Description>
18-
<Version>3.2.4</Version>
19-
<AssemblyVersion>3.2.4.0</AssemblyVersion>
20-
<FileVersion>3.2.4.0</FileVersion>
18+
<Version>3.2.5</Version>
19+
<AssemblyVersion>3.2.5.0</AssemblyVersion>
20+
<FileVersion>3.2.5.0</FileVersion>
2121
<PackageReleaseNotes></PackageReleaseNotes>
2222
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
23-
<PackageVersion>3.2.4</PackageVersion>
23+
<PackageVersion>3.2.5</PackageVersion>
2424
</PropertyGroup>
2525

2626
<ItemGroup>

0 commit comments

Comments
 (0)