Skip to content

Commit 04fe123

Browse files
committed
added parallel executor
1 parent df41b6a commit 04fe123

File tree

2 files changed

+93
-0
lines changed

2 files changed

+93
-0
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using Moq;
2+
3+
namespace KernelMemory.Extensions.FunctionalTests.QueryPipeline;
4+
5+
public class ParallelQueryHandlerTests
6+
{
7+
[Fact]
8+
public void Constructor_WithValidParameters_CreatesInstance()
9+
{
10+
// Arrange & Act
11+
var handler = new ParallelQueryHandler("test");
12+
13+
// Assert
14+
Assert.Equal("test", handler.Name);
15+
}
16+
17+
[Fact]
18+
public async Task HandleAsync_WithMultipleHandlers_CallsAllHandlers()
19+
{
20+
// Arrange
21+
var mock1 = new Mock<IQueryHandler>();
22+
var mock2 = new Mock<IQueryHandler>();
23+
var handler = new ParallelQueryHandler("test", mock1.Object, mock2.Object);
24+
var question = GetQuestion();
25+
26+
// Act
27+
await handler.HandleAsync(question, CancellationToken.None);
28+
29+
// Assert
30+
mock1.Verify(x => x.HandleAsync(question, CancellationToken.None), Times.Once);
31+
mock2.Verify(x => x.HandleAsync(question, CancellationToken.None), Times.Once);
32+
}
33+
34+
private static UserQuestion GetQuestion()
35+
{
36+
return new UserQuestion(new UserQueryOptions("test index"), "question");
37+
}
38+
39+
[Fact]
40+
public async Task HandleAsync_WithNoHandlers_CompletesSuccessfully()
41+
{
42+
// Arrange
43+
var handler = new ParallelQueryHandler("test");
44+
var question = GetQuestion();
45+
46+
// Act & Assert
47+
await handler.HandleAsync(question, CancellationToken.None);
48+
}
49+
50+
[Fact]
51+
public async Task HandleAsync_WhenHandlerThrows_PropagatesException()
52+
{
53+
// Arrange
54+
var mock = new Mock<IQueryHandler>();
55+
mock.Setup(x => x.HandleAsync(It.IsAny<UserQuestion>(), It.IsAny<CancellationToken>()))
56+
.ThrowsAsync(new InvalidOperationException());
57+
var handler = new ParallelQueryHandler("test", mock.Object);
58+
59+
// Act & Assert
60+
await Assert.ThrowsAsync<InvalidOperationException>(() =>
61+
handler.HandleAsync(GetQuestion(), CancellationToken.None));
62+
}
63+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace KernelMemory.Extensions;
7+
8+
/// <summary>
9+
/// Useful when you want to wrap multiple query handlers
10+
/// and you want them to execute concurrently
11+
/// </summary>
12+
public class ParallelQueryHandler : BasicQueryHandler
13+
{
14+
private readonly IEnumerable<IQueryHandler> _handlers;
15+
private readonly string _name;
16+
17+
public ParallelQueryHandler(string name, params IQueryHandler[] handlers)
18+
{
19+
_name = name;
20+
_handlers = handlers;
21+
}
22+
23+
public override string Name => _name;
24+
25+
protected override async Task OnHandleAsync(UserQuestion userQuestion, CancellationToken cancellationToken)
26+
{
27+
var tasks = _handlers.Select(h => h.HandleAsync(userQuestion, cancellationToken));
28+
await Task.WhenAll(tasks);
29+
}
30+
}

0 commit comments

Comments
 (0)