Skip to content

Commit 71df05b

Browse files
imadityaaAditya Abhishek
andauthored
Add option for MinimumIterations in ParallelLoopExecution (#543)
* add min iteration to parallel loop execution * resolve merge conflict * nit * rename parameter --------- Co-authored-by: Aditya Abhishek <[email protected]>
1 parent 169a0f9 commit 71df05b

File tree

2 files changed

+184
-1
lines changed

2 files changed

+184
-1
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
namespace VirtualClient.Contracts
5+
{
6+
using Microsoft.Extensions.DependencyInjection;
7+
using Moq;
8+
using NUnit.Framework;
9+
using System;
10+
using System.Collections.Generic;
11+
using System.Threading;
12+
using System.Threading.Tasks;
13+
using VirtualClient.Common.Telemetry;
14+
using VirtualClient.Contracts;
15+
16+
[TestFixture]
17+
[Category("Unit")]
18+
public class ParallelLoopExecutionTests
19+
{
20+
private MockFixture fixture;
21+
22+
[SetUp]
23+
public void SetupDefaults()
24+
{
25+
this.fixture = new MockFixture();
26+
this.fixture.Parameters = new Dictionary<string, IConvertible>
27+
{
28+
{ "Duration", "00:00:01" }, // Default duration for tests
29+
{ "MinimumIterations", 0 } // Default minimum iterations
30+
};
31+
}
32+
33+
[Test]
34+
public async Task ParallelLoopExecution_RespectsDurationParameter()
35+
{
36+
var component = new TestComponent(this.fixture.Dependencies, this.fixture.Parameters, async token =>
37+
{
38+
await Task.Delay(5000, token); // Simulate long-running task
39+
});
40+
41+
var collection = new TestParallelLoopExecution(this.fixture);
42+
collection.Add(component);
43+
44+
var sw = System.Diagnostics.Stopwatch.StartNew();
45+
await collection.ExecuteAsync(EventContext.None, CancellationToken.None);
46+
sw.Stop();
47+
48+
// Assert: Should not run for more than ~2 seconds (buffer for scheduling)
49+
Assert.LessOrEqual(sw.Elapsed.TotalSeconds, 2.5, "Execution did not respect the Duration parameter.");
50+
}
51+
52+
[Test]
53+
public async Task ParallelLoopExecution_RespectsMinimumIterationsParameterAndTimeout()
54+
{
55+
this.fixture.Parameters["MinimumIterations"] = 2;
56+
this.fixture.Parameters["Duration"] = "00:00:01";
57+
58+
var component = new TestComponent(this.fixture.Dependencies, this.fixture.Parameters, async token =>
59+
{
60+
await Task.Delay(600, token); // Simulate a small task
61+
});
62+
63+
var collection = new TestParallelLoopExecution(this.fixture);
64+
collection.Add(component);
65+
66+
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)))
67+
{
68+
try
69+
{
70+
await collection.ExecuteAsync(EventContext.None, cts.Token);
71+
}
72+
catch { /* ignore */ }
73+
}
74+
75+
// Assert: Should run exactly 2 times, as each iteration takes 600ms,
76+
// Timeout is 1 second and Cancellation Token comes at 2 seconds
77+
Assert.AreEqual(component.ExecutionCount, 2, "Did not execute the minimum number of iterations.");
78+
}
79+
80+
[Test]
81+
public async Task ParallelLoopExecution_RespectsMinimumIterationsParameter()
82+
{
83+
this.fixture.Parameters["MinimumIterations"] = 7;
84+
85+
var component = new TestComponent(this.fixture.Dependencies, this.fixture.Parameters, token =>
86+
{
87+
return Task.CompletedTask;
88+
});
89+
90+
var collection = new TestParallelLoopExecution(this.fixture);
91+
collection.Add(component);
92+
93+
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)))
94+
{
95+
try
96+
{
97+
await collection.ExecuteAsync(EventContext.None, cts.Token);
98+
}
99+
catch { /* ignore */ }
100+
}
101+
102+
// Assert: Should run at least MinimumIterations times
103+
Assert.GreaterOrEqual(component.ExecutionCount, 7, "Did not execute the minimum number of iterations.");
104+
}
105+
106+
[Test]
107+
public void ParallelLoopExecution_ThrowsWorkloadException_WhenComponentThrows()
108+
{
109+
var component = new TestComponent(this.fixture.Dependencies, this.fixture.Parameters, token =>
110+
{
111+
throw new InvalidOperationException("Test exception");
112+
});
113+
114+
var collection = new TestParallelLoopExecution(this.fixture);
115+
collection.Add(component);
116+
117+
var ex = Assert.ThrowsAsync<WorkloadException>(
118+
() => collection.ExecuteAsync(EventContext.None, CancellationToken.None));
119+
Assert.That(ex.Message, Does.Contain("task execution failed"));
120+
Assert.IsInstanceOf<InvalidOperationException>(ex.InnerException);
121+
}
122+
123+
private class TestComponent : VirtualClientComponent
124+
{
125+
private readonly Func<CancellationToken, Task> onExecuteAsync;
126+
127+
public int ExecutionCount { get; private set; }
128+
129+
public TestComponent(IServiceCollection dependencies, IDictionary<string, IConvertible> parameters, Func<CancellationToken, Task> onExecuteAsync = null)
130+
: base(dependencies, parameters)
131+
{
132+
this.onExecuteAsync = onExecuteAsync ?? (_ => Task.CompletedTask);
133+
}
134+
135+
protected override async Task ExecuteAsync(EventContext telemetryContext, CancellationToken cancellationToken)
136+
{
137+
this.ExecutionCount++;
138+
await this.onExecuteAsync(cancellationToken);
139+
}
140+
}
141+
142+
private class TestParallelLoopExecution : ParallelLoopExecution
143+
{
144+
public TestParallelLoopExecution(MockFixture fixture)
145+
: base(fixture.Dependencies, fixture.Parameters)
146+
{
147+
}
148+
149+
public new Task InitializeAsync(EventContext telemetryContext, CancellationToken cancellationToken)
150+
{
151+
return base.InitializeAsync(telemetryContext, cancellationToken);
152+
}
153+
154+
public new Task ExecuteAsync(EventContext telemetryContext, CancellationToken cancellationToken)
155+
{
156+
return base.ExecuteAsync(telemetryContext, cancellationToken);
157+
}
158+
}
159+
}
160+
}

src/VirtualClient/VirtualClient.Contracts/ParallelLoopExecution.cs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,17 @@ public TimeSpan Duration
4242
}
4343
}
4444

45+
/// <summary>
46+
/// The minimum number of times each child component should run. Default set to 0.
47+
/// </summary>
48+
public int MinimumIterations
49+
{
50+
get
51+
{
52+
return this.Parameters.GetValue<int>(nameof(this.MinimumIterations), 0);
53+
}
54+
}
55+
4556
/// <summary>
4657
/// Executes all of the child components continuously in parallel, respecting the specified timeout.
4758
/// </summary>
@@ -72,10 +83,17 @@ protected override Task ExecuteAsync(EventContext telemetryContext, Cancellation
7283
/// </summary>
7384
private async Task ExecuteComponentLoopAsync(VirtualClientComponent component, EventContext telemetryContext, CancellationToken cancellationToken)
7485
{
86+
int iterationCount = 0;
7587
while (!cancellationToken.IsCancellationRequested)
7688
{
7789
try
7890
{
91+
if (this.timeoutTask.IsCompleted && iterationCount >= this.MinimumIterations)
92+
{
93+
this.Logger.LogMessage($"Stopping {nameof(ParallelLoopExecution)} after Timeout of '{this.Duration}'", LogLevel.Information, telemetryContext);
94+
break;
95+
}
96+
7997
string scenarioMessage = string.IsNullOrWhiteSpace(component.Scenario)
8098
? $"{nameof(ParallelLoopExecution)} Component = {component.TypeName}"
8199
: $"{nameof(ParallelLoopExecution)} Component = {component.TypeName} (scenario={component.Scenario})";
@@ -84,14 +102,19 @@ private async Task ExecuteComponentLoopAsync(VirtualClientComponent component, E
84102

85103
// Execute the component task with timeout handling.
86104
Task componentExecutionTask = component.ExecuteAsync(cancellationToken);
105+
87106
Task completedTask = await Task.WhenAny(componentExecutionTask, this.timeoutTask);
88107

89-
if (completedTask == this.timeoutTask)
108+
if (completedTask == this.timeoutTask && iterationCount >= this.MinimumIterations)
90109
{
91110
break;
92111
}
93112

94113
await componentExecutionTask;
114+
115+
iterationCount++;
116+
117+
this.Logger.LogMessage($"Iteration {iterationCount} completed for component {component.TypeName}", LogLevel.Information, telemetryContext);
95118
}
96119
catch (Exception ex)
97120
{

0 commit comments

Comments
 (0)