Skip to content

Commit 5af8d23

Browse files
committed
TPL dataflow action block
1 parent 643b082 commit 5af8d23

File tree

3 files changed

+29
-34
lines changed

3 files changed

+29
-34
lines changed
Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
using Microsoft.Extensions.Logging;
22
using System;
33
using System.Collections.Generic;
4-
using System.Text;
54
using System.Threading;
65
using System.Threading.Tasks;
6+
using System.Threading.Tasks.Dataflow;
77
using WorkflowCore.Interface;
88
using WorkflowCore.Models;
99

@@ -17,8 +17,7 @@ public abstract class QueueTaskDispatcher : IBackgroundTask
1717
protected readonly IQueueProvider QueueProvider;
1818
protected readonly ILogger Logger;
1919
protected readonly WorkflowOptions Options;
20-
protected Task DispatchTask;
21-
private SemaphoreSlim _semaphore;
20+
protected Task DispatchTask;
2221
private CancellationTokenSource _cancellationTokenSource;
2322

2423
protected QueueTaskDispatcher(IQueueProvider queueProvider, ILoggerFactory loggerFactory, WorkflowOptions options)
@@ -36,7 +35,7 @@ public virtual void Start()
3635
throw new InvalidOperationException();
3736

3837
_cancellationTokenSource = new CancellationTokenSource();
39-
_semaphore = new SemaphoreSlim(MaxConcurrentItems);
38+
4039
DispatchTask = new Task(Execute);
4140
DispatchTask.Start();
4241
}
@@ -45,41 +44,38 @@ public virtual void Stop()
4544
{
4645
_cancellationTokenSource.Cancel();
4746
DispatchTask.Wait();
48-
49-
for (var i = 0; i < MaxConcurrentItems; i++)
50-
_semaphore.Wait();
51-
5247
DispatchTask = null;
5348
}
5449

5550
private async void Execute()
5651
{
5752
var cancelToken = _cancellationTokenSource.Token;
53+
var opts = new ExecutionDataflowBlockOptions()
54+
{
55+
MaxDegreeOfParallelism = MaxConcurrentItems,
56+
BoundedCapacity = MaxConcurrentItems + 1
57+
};
58+
59+
var actionBlock = new ActionBlock<string>(ExecuteItem, opts);
60+
5861
while (!cancelToken.IsCancellationRequested)
5962
{
6063
try
6164
{
62-
await _semaphore.WaitAsync(cancelToken);
63-
string item;
64-
try
65-
{
66-
item = await QueueProvider.DequeueWork(Queue, cancelToken);
67-
}
68-
catch
65+
if (SpinWait.SpinUntil(() => actionBlock.InputCount == 0, Options.IdleTime))
6966
{
70-
_semaphore.Release();
71-
throw;
72-
}
67+
var item = await QueueProvider.DequeueWork(Queue, cancelToken);
7368

74-
if (item == null)
75-
{
76-
_semaphore.Release();
77-
if (!QueueProvider.IsDequeueBlocking)
78-
await Task.Delay(Options.IdleTime, cancelToken);
79-
continue;
80-
}
69+
if (item == null)
70+
{
71+
if (!QueueProvider.IsDequeueBlocking)
72+
await Task.Delay(Options.IdleTime, cancelToken);
73+
continue;
74+
}
8175

82-
new Task(() => ExecuteItem(item, cancelToken)).Start();
76+
if (!actionBlock.Post(item))
77+
await QueueProvider.QueueWork(item, Queue);
78+
}
8379
}
8480
catch (OperationCanceledException)
8581
{
@@ -89,13 +85,15 @@ private async void Execute()
8985
Logger.LogError(ex.Message);
9086
}
9187
}
88+
actionBlock.Complete();
89+
await actionBlock.Completion;
9290
}
9391

94-
private async void ExecuteItem(string itemId, CancellationToken cancellationToken)
92+
private async Task ExecuteItem(string itemId)
9593
{
9694
try
9795
{
98-
await ProcessItem(itemId, cancellationToken);
96+
await ProcessItem(itemId, _cancellationTokenSource.Token);
9997
}
10098
catch (OperationCanceledException)
10199
{
@@ -105,10 +103,6 @@ private async void ExecuteItem(string itemId, CancellationToken cancellationToke
105103
{
106104
Logger.LogError($"Error executing item {itemId} - {ex.Message}");
107105
}
108-
finally
109-
{
110-
_semaphore.Release();
111-
}
112106
}
113107
}
114108
}

src/WorkflowCore/WorkflowCore.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
<ItemGroup>
2828
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.0" />
2929
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" />
30+
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.7.0" />
3031
<PackageReference Include="System.Threading.Tasks.Parallel" Version="4.3.0" />
3132
</ItemGroup>
3233

src/samples/WorkflowCore.Sample03/Steps/AddNumbers.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ public class AddNumbers : StepBodyAsync
1919
public override async Task<ExecutionResult> RunAsync(IStepExecutionContext context)
2020
{
2121
if (Input1 == 2)
22-
//await Task.Delay(2000);
23-
System.Threading.Thread.Sleep(2000);
22+
await Task.Delay(2000);
23+
//System.Threading.Thread.Sleep(2000);
2424

2525
Output = (Input1 + Input2);
2626
return ExecutionResult.Next();

0 commit comments

Comments
 (0)