Skip to content

Commit 5e25e8a

Browse files
authored
Multiple constraint keys (#407)
* Multiple constraint keys * Remove reference * Formatting Markdown * Greater than * Update TUnit * AssertAfter * delay * Assertion Tweaks --------- Co-authored-by: Tom Longhurst <thomhurst@users.noreply.github.com>
1 parent e0c6257 commit 5e25e8a

File tree

9 files changed

+220
-30
lines changed

9 files changed

+220
-30
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ Define your pipeline in .NET! Strong types, intellisense, parallelisation, and t
6363
| ModularPipelines.WinGet | Helpers for interacting with the Windows Package Manager. | [![nuget](https://img.shields.io/nuget/v/ModularPipelines.WinGet.svg)](https://www.nuget.org/packages/ModularPipelines.WinGet/) |
6464
| ModularPipelines.Yarn | Helpers for interacting with Yarn CLI. | [![nuget](https://img.shields.io/nuget/v/ModularPipelines.Yarn.svg)](https://www.nuget.org/packages/ModularPipelines.Yarn/) |
6565

66-
6766
## Getting Started
6867

6968
If you want to see how to get started, or want to know more about ModularPipelines, [read the Documentation here](https://thomhurst.github.io/ModularPipelines)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
* The NotInParallel attribute now supports multiple constraint keys. A module will not run concurrently if any other module is running at the same time with any of the same constraint keys

src/ModularPipelines/Attributes/NotInParallelAttribute.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,19 @@ public class NotInParallelAttribute : Attribute
1212
/// Other modules with a different constraint key can run in parallel still.
1313
/// If null or empty, then the module will not be run in parallel with any other module.
1414
/// </summary>
15-
public string? ConstraintKey { get; init; }
15+
public string[] ConstraintKeys { get; } = Array.Empty<string>();
16+
17+
public NotInParallelAttribute()
18+
{
19+
}
20+
21+
public NotInParallelAttribute(string constraintKey) : this([constraintKey])
22+
{
23+
ArgumentNullException.ThrowIfNull(constraintKey);
24+
}
25+
26+
public NotInParallelAttribute(params string[] constraintKeys)
27+
{
28+
ConstraintKeys = constraintKeys;
29+
}
1630
}

src/ModularPipelines/Engine/ModuleExecutor.cs

Lines changed: 80 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public async Task<IEnumerable<ModuleBase>> ExecuteAsync(IReadOnlyList<ModuleBase
4040
.ToList();
4141

4242
var unKeyedNonParallelModules = nonParallelModules
43-
.Where(x => string.IsNullOrEmpty(x.GetType().GetCustomAttribute<NotInParallelAttribute>()!.ConstraintKey))
43+
.Where(x => x.GetType().GetCustomAttribute<NotInParallelAttribute>()!.ConstraintKeys.Length == 0)
4444
.ToList();
4545

4646
foreach (var nonParallelModule in unKeyedNonParallelModules)
@@ -49,39 +49,106 @@ public async Task<IEnumerable<ModuleBase>> ExecuteAsync(IReadOnlyList<ModuleBase
4949
}
5050

5151
var keyedNonParallelModules = nonParallelModules
52-
.Where(x => !string.IsNullOrEmpty(x.GetType().GetCustomAttribute<NotInParallelAttribute>()!.ConstraintKey))
52+
.Where(x => x.GetType().GetCustomAttribute<NotInParallelAttribute>()!.ConstraintKeys.Length != 0)
5353
.ToList();
5454

55-
var groupResults = await keyedNonParallelModules
56-
.Concat(modules.Except(unKeyedNonParallelModules))
57-
.GroupBy(x => x.GetType().GetCustomAttribute<NotInParallelAttribute>()?.ConstraintKey)
58-
.SelectAsync(x => ProcessGroup(x, moduleResults))
59-
.ProcessInParallel();
55+
moduleResults.AddRange(
56+
await ProcessKeyedNonParallelModules(keyedNonParallelModules.ToList(), moduleResults)
57+
);
6058

61-
moduleResults.AddRange(groupResults.SelectMany(x => x));
62-
63-
var moduleTasks = modules.Except(nonParallelModules).Select(ExecuteAsync).ToArray();
59+
var parallelModuleTasks = modules.Except(nonParallelModules)
60+
.Select(ExecuteAsync)
61+
.ToArray();
6462

6563
if (_pipelineOptions.Value.ExecutionMode == ExecutionMode.StopOnFirstException)
6664
{
67-
moduleResults.AddRange(await moduleTasks.WhenAllFailFast());
65+
moduleResults.AddRange(await parallelModuleTasks.WhenAllFailFast());
6866
}
6967
else
7068
{
71-
moduleResults.AddRange(await Task.WhenAll(moduleTasks));
69+
moduleResults.AddRange(await Task.WhenAll(parallelModuleTasks));
7270
}
7371

7472
return moduleResults;
7573
}
7674

77-
public Task<ModuleBase> ExecuteAsync(ModuleBase module)
75+
public async Task<ModuleBase> ExecuteAsync(ModuleBase module)
76+
{
77+
try
78+
{
79+
return await ExecuteWithLockAsync(module);
80+
}
81+
catch (TaskCanceledException)
82+
{
83+
// If the pipeline failed, sometimes a TaskCanceledException can throw before the original exception
84+
// So delay a bit to let the original exception throw first
85+
await Task.Delay(TimeSpan.FromMilliseconds(500));
86+
throw;
87+
}
88+
}
89+
90+
private Task<ModuleBase> ExecuteWithLockAsync(ModuleBase module)
7891
{
7992
lock (module)
8093
{
8194
return _moduleExecutionTasks.GetOrAdd(module, @base => StartModule(module));
8295
}
8396
}
8497

98+
private async Task<ModuleBase[]> ProcessKeyedNonParallelModules(List<ModuleBase> keyedNonParallelModules,
99+
List<ModuleBase> moduleResults)
100+
{
101+
var currentlyExecutingByKeysLock = new object();
102+
var currentlyExecutingByKeys = new List<(string[] Keys, Task)>();
103+
104+
var executing = new List<Task<ModuleBase>>();
105+
106+
while (keyedNonParallelModules.Count > 0)
107+
{
108+
// Reversing allows us to remove from the collection
109+
for (var i = keyedNonParallelModules.Count - 1; i >= 0; i--)
110+
{
111+
var module = keyedNonParallelModules[i];
112+
113+
var notInParallelKeys =
114+
module.GetType().GetCustomAttribute<NotInParallelAttribute>()!.ConstraintKeys;
115+
116+
lock (currentlyExecutingByKeysLock)
117+
{
118+
if (currentlyExecutingByKeys.Any(x => x.Keys.Intersect(notInParallelKeys).Any()))
119+
{
120+
// There are currently executing tasks with that same
121+
continue;
122+
}
123+
}
124+
125+
// Remove from collection as we're now processing it
126+
keyedNonParallelModules.RemoveAt(i);
127+
128+
var executionTask = ExecuteAsync(module);
129+
130+
var tuple = (notInParallelKeys, executionTask);
131+
132+
lock (currentlyExecutingByKeysLock)
133+
{
134+
currentlyExecutingByKeys.Add(tuple);
135+
}
136+
137+
_ = executionTask.ContinueWith(_ =>
138+
{
139+
lock (currentlyExecutingByKeysLock)
140+
{
141+
return currentlyExecutingByKeys.Remove(tuple);
142+
}
143+
});
144+
145+
executing.Add(executionTask);
146+
}
147+
}
148+
149+
return await Task.WhenAll(executing);
150+
}
151+
85152
private async Task<ModuleBase> StartModule(ModuleBase module)
86153
{
87154
if (module.IsStarted || module.ExecutionTask.IsCompleted)

test/ModularPipelines.Azure.UnitTests/ModularPipelines.Azure.UnitTests.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
<ItemGroup>
1212
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0"/>
1313
<PackageReference Include="coverlet.collector" Version="3.1.2"/>
14-
<PackageReference Include="TUnit" Version="0.1.96-alpha01" />
1514
</ItemGroup>
1615

1716
<ItemGroup>

test/ModularPipelines.TestHelpers/ModularPipelines.TestHelpers.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
</ItemGroup>
1212

1313
<ItemGroup>
14-
<PackageReference Include="TUnit" Version="0.1.96-alpha01" />
14+
<PackageReference Include="TUnit" Version="0.1.104-alpha01" />
1515
<PackageReference Include="Azure.ResourceManager" Version="1.10.1" />
1616
<PackageReference Include="Azure.Identity" Version="1.10.4" />
1717
</ItemGroup>

test/ModularPipelines.UnitTests/ModularPipelines.UnitTests.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
1414
<PackageReference Include="Moq" Version="4.20.70" />
1515
<PackageReference Include="NReco.Logging.File" Version="1.1.7" />
16-
<PackageReference Include="TUnit" Version="0.1.96-alpha01" />
16+
<PackageReference Include="TUnit" Version="0.1.104-alpha01" />
1717
<PackageReference Include="coverlet.collector" Version="6.0.0">
1818
<PrivateAssets>all</PrivateAssets>
1919
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>

test/ModularPipelines.UnitTests/NotInParallelTestsWithConstraintKeys.cs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace ModularPipelines.UnitTests;
66

77
public class NotInParallelTestsWithConstraintKeys : TestBase
88
{
9-
[ModularPipelines.Attributes.NotInParallel(ConstraintKey = "A")]
9+
[ModularPipelines.Attributes.NotInParallel("A")]
1010
public class ModuleWithAConstraintKey1 : Module<string>
1111
{
1212
protected override async Task<string?> ExecuteAsync(IPipelineContext context, CancellationToken cancellationToken)
@@ -16,7 +16,7 @@ public class ModuleWithAConstraintKey1 : Module<string>
1616
}
1717
}
1818

19-
[ModularPipelines.Attributes.NotInParallel(ConstraintKey = "A")]
19+
[ModularPipelines.Attributes.NotInParallel("A")]
2020
public class ModuleWithAConstraintKey2 : Module<string>
2121
{
2222
protected override async Task<string?> ExecuteAsync(IPipelineContext context, CancellationToken cancellationToken)
@@ -26,7 +26,7 @@ public class ModuleWithAConstraintKey2 : Module<string>
2626
}
2727
}
2828

29-
[ModularPipelines.Attributes.NotInParallel(ConstraintKey = "B")]
29+
[ModularPipelines.Attributes.NotInParallel("B")]
3030
public class ModuleWithBConstraintKey1 : Module<string>
3131
{
3232
protected override async Task<string?> ExecuteAsync(IPipelineContext context, CancellationToken cancellationToken)
@@ -36,7 +36,7 @@ public class ModuleWithBConstraintKey1 : Module<string>
3636
}
3737
}
3838

39-
[ModularPipelines.Attributes.NotInParallel(ConstraintKey = "B")]
39+
[ModularPipelines.Attributes.NotInParallel("B")]
4040
public class ModuleWithBConstraintKey2 : Module<string>
4141
{
4242
protected override async Task<string?> ExecuteAsync(IPipelineContext context, CancellationToken cancellationToken)
@@ -61,22 +61,35 @@ public async Task NotInParallel_If_Same_ConstraintKey()
6161
var b1 = results.Modules.OfType<ModuleWithBConstraintKey1>().First();
6262
var b2 = results.Modules.OfType<ModuleWithBConstraintKey2>().First();
6363

64-
await AssertAfter(a1, a2, TimeSpan.FromSeconds(1));
65-
await AssertAfter(b1, b2, TimeSpan.FromSeconds(1));
64+
await AssertAfter(a1, a2);
65+
await AssertAfter(b1, b2);
6666

6767
await AssertParallel(a1, b1);
6868
await AssertParallel(a2, b2);
6969
}
7070

71-
private async Task AssertAfter(ModuleBase firstModule, ModuleBase nextModule, TimeSpan expectedTimeAfter)
71+
private async Task AssertAfter(ModuleBase one, ModuleBase two)
7272
{
73-
await Assert.That(nextModule.StartTime)
74-
.Is.EqualToWithTolerance((firstModule.StartTime + expectedTimeAfter), TimeSpan.FromMilliseconds(350));
73+
var modules = new[] { one, two };
74+
var firstModule = modules.OrderBy(x => x.StartTime).First();
75+
var secondModule = modules.OrderBy(x => x.StartTime).Last();
76+
77+
await Assert.That(secondModule.StartTime)
78+
.Is
79+
.GreaterThan(firstModule.StartTime + TimeSpan.FromSeconds(1));
7580
}
7681

77-
private async Task AssertParallel(ModuleBase firstModule, ModuleBase nextModule)
82+
private async Task AssertParallel(ModuleBase one, ModuleBase two)
7883
{
79-
await Assert.That(nextModule.StartTime).
80-
Is.EqualToWithTolerance(firstModule.StartTime, TimeSpan.FromMilliseconds(350));
84+
var modules = new[] { one, two };
85+
var firstModule = modules.OrderBy(x => x.StartTime).First();
86+
var secondModule = modules.OrderBy(x => x.StartTime).Last();
87+
88+
await Assert.That(secondModule.StartTime)
89+
.Is
90+
.GreaterThanOrEqualTo(firstModule.StartTime)
91+
.And
92+
.Is
93+
.LessThanOrEqualTo(firstModule.EndTime);
8194
}
8295
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
using ModularPipelines.Context;
2+
using ModularPipelines.Modules;
3+
using ModularPipelines.TestHelpers;
4+
5+
namespace ModularPipelines.UnitTests;
6+
7+
[Repeat(5)]
8+
public class NotInParallelTestsWithMultipleConstraintKeys : TestBase
9+
{
10+
[ModularPipelines.Attributes.NotInParallel("A")]
11+
public class Module1 : Module<string>
12+
{
13+
protected override async Task<string?> ExecuteAsync(IPipelineContext context, CancellationToken cancellationToken)
14+
{
15+
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
16+
return GetType().Name;
17+
}
18+
}
19+
20+
[ModularPipelines.Attributes.NotInParallel("A", "B")]
21+
public class Module2 : Module<string>
22+
{
23+
protected override async Task<string?> ExecuteAsync(IPipelineContext context, CancellationToken cancellationToken)
24+
{
25+
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
26+
return GetType().Name;
27+
}
28+
}
29+
30+
[ModularPipelines.Attributes.NotInParallel("B", "C")]
31+
public class Module3 : Module<string>
32+
{
33+
protected override async Task<string?> ExecuteAsync(IPipelineContext context, CancellationToken cancellationToken)
34+
{
35+
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
36+
return GetType().Name;
37+
}
38+
}
39+
40+
[ModularPipelines.Attributes.NotInParallel("D")]
41+
public class Module4 : Module<string>
42+
{
43+
protected override async Task<string?> ExecuteAsync(IPipelineContext context, CancellationToken cancellationToken)
44+
{
45+
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
46+
return GetType().Name;
47+
}
48+
}
49+
50+
[Test]
51+
public async Task NotInParallel_If_Any_Modules_Executing_With_Any_Of_Same_ConstraintKey()
52+
{
53+
var resultsTask = TestPipelineHostBuilder.Create()
54+
.AddModule<Module1>()
55+
.AddModule<Module2>()
56+
.AddModule<Module3>()
57+
.AddModule<Module4>()
58+
.ExecutePipelineAsync();
59+
60+
var results = await resultsTask;
61+
62+
var one = results.Modules.OfType<Module1>().First();
63+
var two = results.Modules.OfType<Module2>().First();
64+
var three = results.Modules.OfType<Module3>().First();
65+
var four = results.Modules.OfType<Module4>().First();
66+
67+
await AssertAfter(one, two);
68+
69+
await AssertParallel(one, three);
70+
await AssertParallel(one, four);
71+
}
72+
73+
private async Task AssertAfter(ModuleBase one, ModuleBase two)
74+
{
75+
var modules = new[] { one, two };
76+
var firstModule = modules.OrderBy(x => x.StartTime).First();
77+
var secondModule = modules.OrderBy(x => x.StartTime).Last();
78+
79+
await Assert.That(secondModule.StartTime)
80+
.Is
81+
.GreaterThan(firstModule.StartTime + TimeSpan.FromSeconds(1));
82+
}
83+
84+
private async Task AssertParallel(ModuleBase one, ModuleBase two)
85+
{
86+
var modules = new[] { one, two };
87+
var firstModule = modules.OrderBy(x => x.StartTime).First();
88+
var secondModule = modules.OrderBy(x => x.StartTime).Last();
89+
90+
await Assert.That(secondModule.StartTime)
91+
.Is
92+
.GreaterThanOrEqualTo(firstModule.StartTime)
93+
.And
94+
.Is
95+
.LessThanOrEqualTo(firstModule.EndTime);
96+
}
97+
}

0 commit comments

Comments
 (0)