Skip to content

Commit b4cc852

Browse files
committed
v1.0.4 - GenAI Integration
GenAI integration: document summarization via Google Gemini API GenAIWorker background service for summarization commands GeminiService with configurable retries and timeout handling GenAI event streaming via Server-Sent Events (SSE) New message types: GenAICommand, GenAIEvent Dependencies NEW: Polly 8.6.3 (retry policies)
1 parent 08a7183 commit b4cc852

31 files changed

+1605
-194
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
namespace SWEN3.Paperless.RabbitMq.Tests.Helpers;
2+
3+
internal static class SseTestHelpers
4+
{
5+
public static TestServer CreateSseTestServer<T>(ISseStream<T> sseStream,
6+
Action<IEndpointRouteBuilder> configureEndpoints) where T : class
7+
{
8+
var builder = new WebHostBuilder().ConfigureServices(services =>
9+
{
10+
services.AddSingleton(sseStream);
11+
services.AddRouting();
12+
}).Configure(app =>
13+
{
14+
app.UseRouting();
15+
app.UseEndpoints(configureEndpoints);
16+
});
17+
18+
return new TestServer(builder);
19+
}
20+
}

SWEN3.Paperless.RabbitMq.Tests/Helpers/TestServerFactory.cs

Lines changed: 0 additions & 34 deletions
This file was deleted.
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
namespace SWEN3.Paperless.RabbitMq.Tests.Integration;
2+
3+
public class GenAIEventStreamIntegrationTests
4+
{
5+
[Theory]
6+
[InlineData("Completed", "genai-completed")]
7+
[InlineData("Failed", "genai-failed")]
8+
public async Task MapGenAIEventStream_ShouldEmitCorrectEventType(string status, string expectedEventType)
9+
{
10+
var sseStream = new SseStream<GenAIEvent>();
11+
using var server = SseTestHelpers.CreateSseTestServer(sseStream, endpoints => endpoints.MapGenAIEventStream());
12+
var client = server.CreateClient();
13+
14+
var readTask = Task.Run(async () =>
15+
{
16+
var response = await client.GetAsync("/api/v1/events/genai", HttpCompletionOption.ResponseHeadersRead);
17+
await using var stream = await response.Content.ReadAsStreamAsync();
18+
using var reader = new StreamReader(stream);
19+
20+
var line = await reader.ReadLineAsync();
21+
response.Dispose();
22+
return line;
23+
});
24+
25+
await Task.Delay(100, TestContext.Current.CancellationToken);
26+
27+
var genAiEvent = status == "Completed"
28+
? new GenAIEvent(Guid.NewGuid(), "Test summary", DateTimeOffset.UtcNow)
29+
: new GenAIEvent(Guid.NewGuid(), string.Empty, DateTimeOffset.UtcNow, "Service error");
30+
sseStream.Publish(genAiEvent);
31+
32+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
33+
var eventLine = await readTask.WaitAsync(cts.Token);
34+
35+
eventLine.Should().Be($"event: {expectedEventType}");
36+
}
37+
38+
[Fact]
39+
public async Task MapGenAIEventStream_WithMultipleEvents_ShouldStreamInOrder()
40+
{
41+
var sseStream = new SseStream<GenAIEvent>();
42+
using var server = SseTestHelpers.CreateSseTestServer(sseStream, endpoints => endpoints.MapGenAIEventStream());
43+
44+
var client = server.CreateClient();
45+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
46+
var token = cts.Token;
47+
var readTask = Task.Run(() => ReadEventsAsync(client, 3, token), token);
48+
49+
await Task.Delay(300, TestContext.Current.CancellationToken);
50+
sseStream.Publish(new GenAIEvent(Guid.NewGuid(), "Summary 1", DateTimeOffset.UtcNow));
51+
sseStream.Publish(new GenAIEvent(Guid.NewGuid(), "Summary 2", DateTimeOffset.UtcNow));
52+
sseStream.Publish(new GenAIEvent(Guid.NewGuid(), string.Empty, DateTimeOffset.UtcNow, "Error occurred"));
53+
54+
var events = await readTask.WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken);
55+
56+
events[0].Event.Should().Be("event: genai-completed");
57+
events[0].Data.Should().Contain("Summary 1");
58+
59+
events[1].Event.Should().Be("event: genai-completed");
60+
events[1].Data.Should().Contain("Summary 2");
61+
62+
events[2].Event.Should().Be("event: genai-failed");
63+
events[2].Data.Should().Contain("Error occurred");
64+
}
65+
66+
private static async Task<List<(string Event, string Data)>> ReadEventsAsync(HttpClient client, int count, CancellationToken cancellationToken = default)
67+
{
68+
using (client)
69+
{
70+
using var response =
71+
await client.GetAsync("/api/v1/events/genai", HttpCompletionOption.ResponseHeadersRead, cancellationToken);
72+
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
73+
using var reader = new StreamReader(stream);
74+
75+
var events = new List<(string Event, string Data)>();
76+
for (var i = 0; i < count; i++)
77+
{
78+
var eventLine = await reader.ReadLineAsync(cancellationToken);
79+
var dataLine = await reader.ReadLineAsync(cancellationToken);
80+
81+
if (eventLine == null || dataLine == null)
82+
throw new InvalidOperationException($"Unexpected end of stream while reading event {i + 1} of {count}");
83+
84+
events.Add((eventLine, dataLine));
85+
await reader.ReadLineAsync(cancellationToken);
86+
}
87+
88+
return events;
89+
}
90+
}
91+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
namespace SWEN3.Paperless.RabbitMq.Tests.Integration;
2+
3+
public class GenAIIntegrationTests : IAsyncLifetime
4+
{
5+
private readonly RabbitMqContainer _container = new RabbitMqBuilder().Build();
6+
private IHost _host = null!;
7+
private IServiceProvider _serviceProvider = null!;
8+
9+
private IRabbitMqPublisher Publisher => _serviceProvider.GetRequiredService<IRabbitMqPublisher>();
10+
private IRabbitMqConsumerFactory ConsumerFactory => _serviceProvider.GetRequiredService<IRabbitMqConsumerFactory>();
11+
private IConnection Connection => _serviceProvider.GetRequiredService<IConnection>();
12+
13+
public async ValueTask InitializeAsync()
14+
{
15+
await _container.StartAsync();
16+
var hostBuilder = Host.CreateDefaultBuilder().ConfigureAppConfiguration((_, config) =>
17+
{
18+
config.AddInMemoryCollection(new Dictionary<string, string?>
19+
{
20+
["RabbitMQ:Uri"] = _container.GetConnectionString()
21+
});
22+
}).ConfigureServices((context, services) =>
23+
{
24+
services.AddPaperlessRabbitMq(context.Configuration);
25+
services.AddLogging();
26+
});
27+
_host = hostBuilder.Build();
28+
_serviceProvider = _host.Services;
29+
await _host.StartAsync();
30+
await Task.Delay(TimeSpan.FromSeconds(2));
31+
}
32+
33+
public async ValueTask DisposeAsync()
34+
{
35+
await _host.StopAsync();
36+
_host.Dispose();
37+
await _container.DisposeAsync();
38+
}
39+
40+
[Fact]
41+
public async Task GenAIEvent_ShouldPublishAndConsume_Successfully()
42+
{
43+
var testEvent = new GenAIEvent(Guid.NewGuid(), "This is a test summary for integration testing",
44+
DateTimeOffset.UtcNow);
45+
await Publisher.PublishGenAIEventAsync(testEvent);
46+
await Task.Delay(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken);
47+
48+
GenAIEvent? receivedEvent = null;
49+
await using var consumer = await ConsumerFactory.CreateConsumerAsync<GenAIEvent>();
50+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
51+
await foreach (var message in consumer.ConsumeAsync(cts.Token))
52+
{
53+
receivedEvent = message;
54+
await consumer.AckAsync();
55+
break;
56+
}
57+
58+
receivedEvent.Should().NotBeNull();
59+
receivedEvent.DocumentId.Should().Be(testEvent.DocumentId);
60+
receivedEvent.Summary.Should().Be(testEvent.Summary);
61+
receivedEvent.GeneratedAt.Should().BeCloseTo(testEvent.GeneratedAt, TimeSpan.FromSeconds(1));
62+
receivedEvent.ErrorMessage.Should().BeNull();
63+
}
64+
65+
[Fact]
66+
public async Task GenAIEvent_WithError_ShouldPublishAndConsume_Successfully()
67+
{
68+
var testEvent = new GenAIEvent(Guid.NewGuid(), string.Empty, DateTimeOffset.UtcNow,
69+
"Failed to generate summary: API rate limit exceeded");
70+
await Publisher.PublishGenAIEventAsync(testEvent);
71+
await Task.Delay(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken);
72+
73+
GenAIEvent? receivedEvent = null;
74+
await using var consumer = await ConsumerFactory.CreateConsumerAsync<GenAIEvent>();
75+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
76+
await foreach (var message in consumer.ConsumeAsync(cts.Token))
77+
{
78+
receivedEvent = message;
79+
await consumer.AckAsync();
80+
break;
81+
}
82+
83+
receivedEvent.Should().NotBeNull();
84+
receivedEvent.DocumentId.Should().Be(testEvent.DocumentId);
85+
receivedEvent.Summary.Should().BeEmpty();
86+
receivedEvent.ErrorMessage.Should().Be(testEvent.ErrorMessage);
87+
}
88+
89+
[Fact]
90+
public async Task GenAIEvent_MultipleEvents_ShouldProcessInOrder()
91+
{
92+
var events = Enumerable.Range(1, 3).Select(i => new GenAIEvent(Guid.NewGuid(),
93+
$"Summary {i}", DateTimeOffset.UtcNow.AddMinutes(i))).ToList();
94+
95+
await using var consumer = await ConsumerFactory.CreateConsumerAsync<GenAIEvent>();
96+
foreach (var evt in events)
97+
await Publisher.PublishGenAIEventAsync(evt);
98+
99+
var receivedEvents = new List<GenAIEvent>();
100+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(TestContext.Current.CancellationToken);
101+
cts.CancelAfter(TimeSpan.FromSeconds(10));
102+
await foreach (var message in consumer.ConsumeAsync(cts.Token))
103+
{
104+
receivedEvents.Add(message);
105+
await consumer.AckAsync();
106+
if (receivedEvents.Count >= 3)
107+
break;
108+
}
109+
110+
receivedEvents.Should().HaveCount(3);
111+
receivedEvents.Should()
112+
.BeEquivalentTo(events, options => options.WithStrictOrdering().ComparingByMembers<GenAIEvent>());
113+
}
114+
115+
[Fact]
116+
public async Task GenAIQueue_ShouldExistInTopology()
117+
{
118+
await using var channel =
119+
await Connection.CreateChannelAsync(cancellationToken: TestContext.Current.CancellationToken);
120+
var result = await channel.QueueDeclarePassiveAsync(RabbitMqSchema.GenAIEventQueue,
121+
TestContext.Current.CancellationToken);
122+
123+
result.Should().NotBeNull();
124+
result.QueueName.Should().Be(RabbitMqSchema.GenAIEventQueue);
125+
}
126+
127+
[Fact]
128+
public async Task GenAIEvent_Nack_ShouldRequeue()
129+
{
130+
var testEvent = new GenAIEvent(Guid.NewGuid(), "Nack test", DateTimeOffset.UtcNow);
131+
await Publisher.PublishGenAIEventAsync(testEvent);
132+
await Task.Delay(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken);
133+
134+
await using (var consumer1 = await ConsumerFactory.CreateConsumerAsync<GenAIEvent>())
135+
{
136+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
137+
await foreach (var _ in consumer1.ConsumeAsync(cts.Token))
138+
{
139+
await consumer1.NackAsync();
140+
break;
141+
}
142+
}
143+
144+
GenAIEvent? redeliveredEvent = null;
145+
await using (var consumer2 = await ConsumerFactory.CreateConsumerAsync<GenAIEvent>())
146+
{
147+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
148+
await foreach (var message in consumer2.ConsumeAsync(cts.Token))
149+
{
150+
redeliveredEvent = message;
151+
await consumer2.AckAsync();
152+
break;
153+
}
154+
}
155+
156+
redeliveredEvent.Should().NotBeNull();
157+
redeliveredEvent.DocumentId.Should().Be(testEvent.DocumentId);
158+
redeliveredEvent.Summary.Should().Be(testEvent.Summary);
159+
}
160+
}
Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
namespace SWEN3.Paperless.RabbitMq.Tests.Integration;
22

3-
public class OcrEventStreamIntegrationTests
3+
public sealed class OcrEventStreamIntegrationTests
44
{
55
[Theory]
66
[InlineData("Completed", "ocr-completed")]
@@ -9,26 +9,29 @@ public class OcrEventStreamIntegrationTests
99
public async Task MapOcrEventStream_ShouldEmitCorrectEventType(string status, string expectedEventType)
1010
{
1111
var sseStream = new SseStream<OcrEvent>();
12-
var ocrEvent = new OcrEvent(Guid.NewGuid(), status, status is "Completed" ? "Text" : null,
13-
DateTimeOffset.UtcNow);
12+
var server = SseTestHelpers.CreateSseTestServer(sseStream, endpoints => endpoints.MapOcrEventStream());
13+
14+
var readTask = Task.Run(async () =>
15+
{
16+
using var client = server.CreateClient();
17+
var response = await client.GetAsync("/api/v1/ocr-results", HttpCompletionOption.ResponseHeadersRead);
18+
await using var stream = await response.Content.ReadAsStreamAsync();
19+
using var reader = new StreamReader(stream);
1420

15-
using var server = TestServerFactory.CreateSseTestServer(sseStream,
16-
endpoints => endpoints.MapOcrEventStream());
17-
using var client = server.CreateClient();
21+
var line = await reader.ReadLineAsync();
22+
response.Dispose();
23+
return line;
24+
});
1825

19-
var responseTask = client.GetAsync("/api/v1/ocr-results", HttpCompletionOption.ResponseHeadersRead,
20-
TestContext.Current.CancellationToken);
21-
await Task.Delay(100, TestContext.Current.CancellationToken);
26+
await Task.Delay(300, TestContext.Current.CancellationToken);
2227

28+
var ocrEvent = new OcrEvent(Guid.NewGuid(), status, status is "Completed" ? "Text" : null,
29+
DateTimeOffset.UtcNow);
2330
sseStream.Publish(ocrEvent);
2431

25-
var response = await responseTask;
26-
await using var stream = await response.Content.ReadAsStreamAsync(TestContext.Current.CancellationToken);
27-
using var reader = new StreamReader(stream, Encoding.UTF8);
32+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
33+
var eventLine = await readTask.WaitAsync(cts.Token);
2834

29-
var eventLine = await reader.ReadLineAsync(TestContext.Current.CancellationToken);
3035
eventLine.Should().Be($"event: {expectedEventType}");
31-
32-
response.Dispose();
3336
}
34-
}
37+
}

0 commit comments

Comments
 (0)