Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions src/Moryx.Runtime.Kernel/Modules/Components/ModuleStarter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ public async Task StartAllAsync(CancellationToken cancellationToken)
ConvertBranch(root);
}

foreach (var module in depTree.RootModules.Where(ShouldBeStarted).Select(branch => branch.RepresentedModule))
var toBeStarted = depTree.RootModules.Where(ShouldBeStarted).Select(branch => branch.RepresentedModule);
await Parallel.ForEachAsync(toBeStarted, cancellationToken, async (module, token) =>
{
await StartModule(module, cancellationToken);
}
await StartModule(module, token);
});
}

private async Task StartModule(IServerModule module, CancellationToken cancellationToken)
Expand All @@ -84,7 +85,7 @@ private async Task StartModule(IServerModule module, CancellationToken cancellat
}
else
{
_ = Task.Run(async () => await ExecuteModuleStart(module, cancellationToken), cancellationToken);
await ExecuteModuleStart(module, cancellationToken);
}
}

Expand Down Expand Up @@ -115,15 +116,15 @@ private async Task ModuleChangedState(IServerModule module, ServerModuleState ne
// Now we start every service waiting on this service to return
await _waitingModulesSemaphore.ExecuteAsync(async () =>
{
if (!WaitingModules.TryGetValue(module, out var value))
if (!WaitingModules.TryGetValue(module, out var waitingModules))
return;

// To increase boot speed we fork module start if more than one dependent was found
foreach (var waitingModule in value.ToArray())
await Parallel.ForEachAsync(waitingModules.ToArray(), cancellationToken, async (waitingModule, token) =>
{
value.Remove(waitingModule);
await StartModule(waitingModule, cancellationToken);
}
waitingModules.Remove(waitingModule);
await StartModule(waitingModule, token);
});

// We remove this service for now after we started every dependent
WaitingModules.Remove(module);
Expand All @@ -141,11 +142,11 @@ private void ConvertBranch(IModuleDependency branch)

private async Task EnqueueServiceAndStartDependencies(IEnumerable<IServerModule> dependencies, IServerModule waitingService, CancellationToken cancellationToken)
{
foreach (var dependency in dependencies)
await Parallel.ForEachAsync(dependencies, cancellationToken, async (dependency, token) =>
{
AddWaitingModule(dependency, waitingService);
await StartAsync(dependency, cancellationToken);
}
await StartAsync(dependency, token);
});
}

private bool ShouldBeStarted(IModuleDependency plugin)
Expand Down
39 changes: 9 additions & 30 deletions src/Tests/Moryx.Runtime.Kernel.Tests/ModuleManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public async Task ShouldInitializeTheModule()
await moduleManager.InitializeModuleAsync(mockModule.Object);

// Assert
mockModule.Verify(mock => mock.InitializeAsync());
mockModule.Verify(mock => mock.InitializeAsync(It.IsAny<CancellationToken>()));
}

[Test]
Expand All @@ -208,14 +208,11 @@ public async Task ShouldStartAllModules()
// Act
await moduleManager.StartModulesAsync();

// Assert
Thread.Sleep(3000); // Give the thread pool some time

mockModule1.Verify(mock => mock.InitializeAsync(), Times.Exactly(2));
mockModule1.Verify(mock => mock.StartAsync());
mockModule1.Verify(mock => mock.InitializeAsync(It.IsAny<CancellationToken>()), Times.Exactly(2));
mockModule1.Verify(mock => mock.StartAsync(It.IsAny<CancellationToken>()));

mockModule2.Verify(mock => mock.InitializeAsync(), Times.Exactly(2));
mockModule2.Verify(mock => mock.StartAsync());
mockModule2.Verify(mock => mock.InitializeAsync(It.IsAny<CancellationToken>()), Times.Exactly(2));
mockModule2.Verify(mock => mock.StartAsync(It.IsAny<CancellationToken>()));
}

[Test]
Expand All @@ -229,11 +226,9 @@ public async Task ShouldStartOneModule()
// Act
await moduleManager.StartModuleAsync(mockModule.Object);

Thread.Sleep(1);

// Assert
mockModule.Verify(mock => mock.InitializeAsync());
mockModule.Verify(mock => mock.StartAsync());
mockModule.Verify(mock => mock.InitializeAsync(It.IsAny<CancellationToken>()));
mockModule.Verify(mock => mock.StartAsync(It.IsAny<CancellationToken>()));
}

[Test]
Expand All @@ -250,8 +245,8 @@ public async Task ShouldStopModulesAndDeregisterFromEvents()
await moduleManager.StopModulesAsync();

// Assert
mockModule1.Verify(mock => mock.StopAsync());
mockModule2.Verify(mock => mock.StopAsync());
mockModule1.Verify(mock => mock.StopAsync(It.IsAny<CancellationToken>()));
mockModule2.Verify(mock => mock.StopAsync(It.IsAny<CancellationToken>()));
}

[Test]
Expand Down Expand Up @@ -281,8 +276,6 @@ public async Task CheckLifeCycleBoundActivatedCountIs1()
// Act
await moduleManager.StartModulesAsync();

WaitForTimeboxed(() => module.State == ServerModuleState.Running);

// Assert
Assert.That(module.ActivatedCount, Is.EqualTo(1));
}
Expand All @@ -297,24 +290,10 @@ public async Task CheckLifeCycleBoundDeactivatedCountIs1()
// Act
await moduleManager.StartModulesAsync();

WaitForTimeboxed(() => module.State == ServerModuleState.Running);

await moduleManager.StopModulesAsync();

WaitForTimeboxed(() => module.State == ServerModuleState.Stopped);

// Assert
Assert.That(module.ActivatedCount, Is.EqualTo(1));
}

private static void WaitForTimeboxed(Func<bool> condition, int maxSeconds = 10)
{
var i = 0;
while (!condition() && (i < maxSeconds))
{
Thread.Sleep(100);
i++;
}
}
}
}
Loading